From 37d7c27ba005b0a473a8c84a964890c503e0d503 Mon Sep 17 00:00:00 2001 From: Matiss Jurevics Date: Fri, 20 Feb 2026 14:00:00 +0000 Subject: [PATCH] Revert "feat(streams): add phase-2 SFU transport handshake and produce/consume APIs" This reverts commit 498a7f838b7747470b220701505c4bfbd3ea8cff. --- Backend/.env.example | 5 - Backend/README.md | 15 +- Backend/bun.lock | 40 +--- Backend/media/sfu/mediasoup.ts | 405 --------------------------------- Backend/media/sfu/noop.ts | 134 +---------- Backend/media/sfu/registry.ts | 116 +--------- Backend/media/sfu/service.ts | 10 +- Backend/media/sfu/types.ts | 84 +------ Backend/package.json | 6 +- Backend/public/mobile-sim.js | 349 ++-------------------------- Backend/routes/ops.ts | 16 +- Backend/routes/streams.ts | 280 ++--------------------- 12 files changed, 65 insertions(+), 1395 deletions(-) delete mode 100644 Backend/media/sfu/mediasoup.ts diff --git a/Backend/.env.example b/Backend/.env.example index 0d2a629..e12b2cf 100644 --- a/Backend/.env.example +++ b/Backend/.env.example @@ -12,15 +12,10 @@ MINIO_BUCKET=videos MINIO_REGION=us-east-1 MINIO_PRESIGNED_EXPIRY_SECONDS=600 MEDIA_MODE=legacy -MEDIA_SFU_ENGINE=mediasoup MEDIA_PROVIDER=mock TURN_URLS= TURN_USERNAME= TURN_CREDENTIAL= -MEDIA_WEBRTC_LISTEN_IP=127.0.0.1 -MEDIA_WEBRTC_ANNOUNCED_IP= -MEDIA_RTC_MIN_PORT=40000 -MEDIA_RTC_MAX_PORT=49999 MEDIA_RECORDINGS_DIR=media-recordings MEDIA_MAX_PUBLISHERS=4 MEDIA_MAX_SUBSCRIBERS_PER_ROOM=12 diff --git a/Backend/README.md b/Backend/README.md index 2784edd..7c5c9cc 100644 --- a/Backend/README.md +++ b/Backend/README.md @@ -34,12 +34,9 @@ Required env vars: | `BETTER_AUTH_BASE_URL` | Public base URL for the backend (e.g., `http://localhost:3000`) | | `BETTER_AUTH_TRUSTED_ORIGINS` | Comma-separated list of allowed frontend origins | | `PORT` | HTTP port (default `3000`) | -| `MEDIA_MODE` | Media runtime mode (`legacy` default, `single_server_sfu` for in-process SFU path) | -| `MEDIA_SFU_ENGINE` | SFU engine for `single_server_sfu` mode (`mediasoup` default, `noop` fallback) | +| `MEDIA_MODE` | Media runtime mode (`legacy` default, `single_server_sfu` scaffold mode) | | `MEDIA_PROVIDER` | Media backend provider (`mock` by default) | | `TURN_URLS` / `TURN_USERNAME` / `TURN_CREDENTIAL` | TURN/STUN configuration used by single-server SFU mode | -| `MEDIA_WEBRTC_LISTEN_IP` / `MEDIA_WEBRTC_ANNOUNCED_IP` | WebRTC transport bind/announce IPs for mediasoup SFU. Do not leave bind IP as `0.0.0.0` without an announced IP in non-local environments. | -| `MEDIA_RTC_MIN_PORT` / `MEDIA_RTC_MAX_PORT` | UDP/TCP RTP port range for mediasoup worker | | `MEDIA_RECORDINGS_DIR` | Local output directory for server-side recording workers (planned in SFU mode) | | `MEDIA_MAX_PUBLISHERS` / `MEDIA_MAX_SUBSCRIBERS_PER_ROOM` | Soft concurrency limits for single-server media mode (planned) | | `MINIO_*` | Connection settings for the MinIO/S3 endpoint | @@ -148,16 +145,10 @@ Stream realtime events: - Client receives `stream:started` when camera accepts. - Both devices receive `stream:ended` when session is closed. -Experimental SFU endpoints (`MEDIA_MODE=single_server_sfu`): +Experimental SFU scaffolding endpoints (`MEDIA_MODE=single_server_sfu`): - `GET /streams/:streamSessionId/sfu/session` – fetch in-memory SFU session state for participant devices -- `GET /streams/:streamSessionId/sfu/router-rtp-capabilities` – fetch router RTP capabilities used by mediasoup-client `Device.load()` - `POST /streams/:streamSessionId/sfu/publish-transport` – camera creates publish transport descriptor - `POST /streams/:streamSessionId/sfu/subscribe-transport` – participant creates subscribe transport descriptor -- `POST /streams/:streamSessionId/sfu/publish-transport/connect` – camera marks publish transport as connected -- `POST /streams/:streamSessionId/sfu/subscribe-transport/connect` – participant marks subscribe transport as connected -- `POST /streams/:streamSessionId/sfu/produce` – camera registers media producer on connected publish transport -- `POST /streams/:streamSessionId/sfu/consume` – participant creates consumer from available producer -- `public/mobile-sim.js` now uses mediasoup-client handshakes in SFU mode and attaches consumed tracks to `#clientStreamVideo`; legacy mode keeps direct WebRTC + frame relay fallback behavior. #### Streaming Scale Tradeoffs (Current Prototype) - The current implementation is **not production-grade at scale**. @@ -166,7 +157,7 @@ Experimental SFU endpoints (`MEDIA_MODE=single_server_sfu`): - Running live transport + fan-out + recording on the same web server is possible for small loads but introduces significant CPU, RAM, and network egress pressure under concurrency. - For larger deployments, use a dedicated media plane (managed or self-hosted SFU + recorder) and keep this service focused on auth/session/control APIs. - For a pragmatic prototype path that keeps media on the current server, see `docs/streaming-on-web-server-plan.md`. -- `MEDIA_MODE=single_server_sfu` requires a functional SFU engine (`MEDIA_SFU_ENGINE`, default `mediasoup`) and proper network/port exposure for WebRTC. +- `MEDIA_MODE=single_server_sfu` currently enables scaffolding only (interfaces/config/health visibility), not full SFU media routing yet. ### API Docs OpenAPI docs are generated from Zod/OpenAPI definitions: diff --git a/Backend/bun.lock b/Backend/bun.lock index 6de0373..64f6059 100644 --- a/Backend/bun.lock +++ b/Backend/bun.lock @@ -15,7 +15,6 @@ "drizzle-orm": "^0.44.0", "express": "^5.2.1", "helmet": "^8.1.0", - "mediasoup": "^3.15.6", "minio": "^8.0.6", "openai": "^6.18.0", "pg": "^8.18.0", @@ -40,9 +39,6 @@ }, }, }, - "trustedDependencies": [ - "mediasoup", - ], "packages": { "@asteasolutions/zod-to-openapi": ["@asteasolutions/zod-to-openapi@8.4.0", "", { "dependencies": { "openapi3-ts": "^4.1.2" }, "peerDependencies": { "zod": "^4.0.0" } }, "sha512-Ckp971tmTw4pnv+o7iK85ldBHBKk6gxMaoNyLn3c2Th/fKoTG8G3jdYuOanpdGqwlDB0z01FOjry2d32lfTqrA=="], @@ -114,8 +110,6 @@ "@esbuild/win32-x64": ["@esbuild/win32-x64@0.25.12", "", { "os": "win32", "cpu": "x64" }, "sha512-alJC0uCZpTFrSL0CCDjcgleBXPnCrEAhTBILpeAp7M/OFgoqtAetfBzX0xM00MUsVVPpVjlPuMbREqnZCXaTnA=="], - "@isaacs/fs-minipass": ["@isaacs/fs-minipass@4.0.1", "", { "dependencies": { "minipass": "^7.0.4" } }, "sha512-wgm9Ehl2jpeqP3zw/7mo3kRHFp5MEDhqAdwy1fTGkHAwnkGOVsgpvQhL8B5n1qlb01jV3n/bI0ZfZp5lWA1k4w=="], - "@jridgewell/resolve-uri": ["@jridgewell/resolve-uri@3.1.2", "", {}, "sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw=="], "@jridgewell/sourcemap-codec": ["@jridgewell/sourcemap-codec@1.5.5", "", {}, "sha512-cYQ9310grqxueWbl+WuIUIaiUaDcj7WOq5fVhEljNVgRfOUhY9fy2zTvfoqWsnebh8Sl70VScFbICvJnLKB0Og=="], @@ -238,8 +232,6 @@ "call-bound": ["call-bound@1.0.4", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.2", "get-intrinsic": "^1.3.0" } }, "sha512-+ys997U96po4Kx/ABpBCqhA9EuxJaQWDQg7295H4hBphv3IZg0boBKuwYpt4YXp6MZ5AmZQnU/tyMTlRpaSejg=="], - "chownr": ["chownr@3.0.0", "", {}, "sha512-+IxzY9BZOQd/XuYPRmrvEVjF/nqj5kgT4kEq7VofrDoM1MxoRjEWkrCC3EtLi59TVawxTAn+orJwFQcrqEN1+g=="], - "combined-stream": ["combined-stream@1.0.8", "", { "dependencies": { "delayed-stream": "~1.0.0" } }, "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg=="], "content-disposition": ["content-disposition@1.0.1", "", {}, "sha512-oIXISMynqSqm241k6kcQ5UwttDILMK4BiurCfGEREw6+X9jkkpEe5T9FZaApyLGGOnFuyMWZpdolTXMtvEJ08Q=="], @@ -254,8 +246,6 @@ "create-require": ["create-require@1.1.1", "", {}, "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ=="], - "data-uri-to-buffer": ["data-uri-to-buffer@4.0.1", "", {}, "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A=="], - "debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], "decode-uri-component": ["decode-uri-component@0.2.2", "", {}, "sha512-FqUYQ+8o158GyGTrMFJms9qh3CqTKvAqgqsTnkLI8sKu0028orqBhxNMFkFen0zGyg6epACD32pjVk58ngIErQ=="], @@ -316,22 +306,16 @@ "fast-xml-parser": ["fast-xml-parser@4.5.3", "", { "dependencies": { "strnum": "^1.1.1" }, "bin": { "fxparser": "src/cli/cli.js" } }, "sha512-RKihhV+SHsIUGXObeVy9AXiBbFwkVk7Syp8XgwN5U3JV416+Gwp/GO9i0JYKmikykgz/UHRrrV4ROuZEo/T0ig=="], - "fetch-blob": ["fetch-blob@3.2.0", "", { "dependencies": { "node-domexception": "^1.0.0", "web-streams-polyfill": "^3.0.3" } }, "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ=="], - "filter-obj": ["filter-obj@1.1.0", "", {}, "sha512-8rXg1ZnX7xzy2NGDVkBVaAy+lSlPNwad13BtgSlLuxfIslyt5Vg64U7tFcCt4WS1R0hvtnQybT/IyCkGZ3DpXQ=="], "finalhandler": ["finalhandler@2.1.1", "", { "dependencies": { "debug": "^4.4.0", "encodeurl": "^2.0.0", "escape-html": "^1.0.3", "on-finished": "^2.4.1", "parseurl": "^1.3.3", "statuses": "^2.0.1" } }, "sha512-S8KoZgRZN+a5rNwqTxlZZePjT/4cnm0ROV70LedRHZ0p8u9fRID0hJUZQpkKLzro8LfmC8sx23bY6tVNxv8pQA=="], - "flatbuffers": ["flatbuffers@25.9.23", "", {}, "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ=="], - "follow-redirects": ["follow-redirects@1.15.11", "", {}, "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ=="], "for-each": ["for-each@0.3.5", "", { "dependencies": { "is-callable": "^1.2.7" } }, "sha512-dKx12eRCVIzqCxFGplyFKJMPvLEWgmNtUrpTiJIR5u97zEhRG8ySrtboPHZXx7daLxQVrl643cTzbab2tkQjxg=="], "form-data": ["form-data@4.0.5", "", { "dependencies": { "asynckit": "^0.4.0", "combined-stream": "^1.0.8", "es-set-tostringtag": "^2.1.0", "hasown": "^2.0.2", "mime-types": "^2.1.12" } }, "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w=="], - "formdata-polyfill": ["formdata-polyfill@4.0.10", "", { "dependencies": { "fetch-blob": "^3.1.2" } }, "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g=="], - "forwarded": ["forwarded@0.2.0", "", {}, "sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow=="], "fresh": ["fresh@2.0.0", "", {}, "sha512-Rx/WycZ60HOaqLKAi6cHRKKI7zxWbJ31MhntmtwMoaTeF7XFH9hhBp8vITaMidfljRQ6eYWCKkaTK+ykVJHP2A=="], @@ -362,8 +346,6 @@ "gtoken": ["gtoken@5.3.2", "", { "dependencies": { "gaxios": "^4.0.0", "google-p12-pem": "^3.1.3", "jws": "^4.0.0" } }, "sha512-gkvEKREW7dXWF8NV8pVrKfW7WqReAmjjkMBh6lNCCGOM4ucS0r0YyXXl0r/9Yj8wcW/32ISkfc8h5mPTDbtifQ=="], - "h264-profile-level-id": ["h264-profile-level-id@2.3.2", "", { "dependencies": { "debug": "^4.4.3" } }, "sha512-hnq1UDlw7WGJV6GCr/g7wnkHYUjdAY2bis9rgn2JqSdQS2WfVvnt1ZE9g8nTguracodf5LLKZOwURsDN49YtBQ=="], - "has-property-descriptors": ["has-property-descriptors@1.0.2", "", { "dependencies": { "es-define-property": "^1.0.0" } }, "sha512-55JNKuIW+vq4Ke1BjOTjM2YctQIvCT7GFzHwmfZPGo5wnrgkid0YQtnAleFSqumZm4az3n2BS+erby5ipJdgrg=="], "has-symbols": ["has-symbols@1.1.0", "", {}, "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ=="], @@ -418,8 +400,6 @@ "media-typer": ["media-typer@1.1.0", "", {}, "sha512-aisnrDP4GNe06UcKFnV5bfMNPBUw4jsLGaWwWfnH3v02GnBuXX2MCVn5RbrWo0j3pczUilYblq7fQ7Nw2t5XKw=="], - "mediasoup": ["mediasoup@3.19.17", "", { "dependencies": { "debug": "^4.4.3", "flatbuffers": "^25.9.23", "h264-profile-level-id": "^2.3.2", "node-fetch": "^3.3.2", "supports-color": "^10.2.2", "tar": "^7.5.7" } }, "sha512-wnmp/0dd56GBR5LzP+DXnDSAggykl9RncPIoUsZJffW/ggByyTqUjhC78lPGOPta+xmYLR0bKsogGQLi26S+9g=="], - "merge-descriptors": ["merge-descriptors@2.0.0", "", {}, "sha512-Snk314V5ayFLhp3fkUREub6WtjBfPdCPY1Ln8/8munuLuiYhsABgBVWsozAG+MWMbVEvcdcpbi9R7ww22l9Q3g=="], "mime-db": ["mime-db@1.54.0", "", {}, "sha512-aU5EJuIN2WDemCcAp2vFBfp/m4EAhWJnUNSSw0ixs7/kXbd6Pg64EmwJkNdFhB8aWt1sH2CTXrLxo/iAGV3oPQ=="], @@ -428,10 +408,6 @@ "minio": ["minio@8.0.6", "", { "dependencies": { "async": "^3.2.4", "block-stream2": "^2.1.0", "browser-or-node": "^2.1.1", "buffer-crc32": "^1.0.0", "eventemitter3": "^5.0.1", "fast-xml-parser": "^4.4.1", "ipaddr.js": "^2.0.1", "lodash": "^4.17.21", "mime-types": "^2.1.35", "query-string": "^7.1.3", "stream-json": "^1.8.0", "through2": "^4.0.2", "web-encoding": "^1.1.5", "xml2js": "^0.5.0 || ^0.6.2" } }, "sha512-sOeh2/b/XprRmEtYsnNRFtOqNRTPDvYtMWh+spWlfsuCV/+IdxNeKVUMKLqI7b5Dr07ZqCPuaRGU/rB9pZYVdQ=="], - "minipass": ["minipass@7.1.2", "", {}, "sha512-qOOzS1cBTWYF4BH8fVePDBOO9iptMnGUEZwNc/cMWnTV2nVLZ7VoNWEPHkYczZA0pdoA7dl6e7FL659nX9S2aw=="], - - "minizlib": ["minizlib@3.1.0", "", { "dependencies": { "minipass": "^7.1.2" } }, "sha512-KZxYo1BUkWD2TVFLr0MQoM8vUUigWD3LlD83a/75BqC+4qE0Hb1Vo5v1FgcfaNXvfXzr+5EhQ6ing/CaBijTlw=="], - "ms": ["ms@2.1.3", "", {}, "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="], "nanostores": ["nanostores@1.1.0", "", {}, "sha512-yJBmDJr18xy47dbNVlHcgdPrulSn1nhSE6Ns9vTG+Nx9VPT6iV1MD6aQFp/t52zpf82FhLLTXAXr30NuCnxvwA=="], @@ -440,9 +416,7 @@ "node-addon-api": ["node-addon-api@8.5.0", "", {}, "sha512-/bRZty2mXUIFY/xU5HLvveNHlswNJej+RnxBjOMkidWfwZzgTbPG1E3K5TOxRLOR+5hX7bSofy8yf1hZevMS8A=="], - "node-domexception": ["node-domexception@1.0.0", "", {}, "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ=="], - - "node-fetch": ["node-fetch@3.3.2", "", { "dependencies": { "data-uri-to-buffer": "^4.0.0", "fetch-blob": "^3.1.4", "formdata-polyfill": "^4.0.10" } }, "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA=="], + "node-fetch": ["node-fetch@2.7.0", "", { "dependencies": { "whatwg-url": "^5.0.0" }, "peerDependencies": { "encoding": "^0.1.0" }, "optionalPeers": ["encoding"] }, "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A=="], "node-forge": ["node-forge@1.3.3", "", {}, "sha512-rLvcdSyRCyouf6jcOIPe/BgwG/d7hKjzMKOas33/pHEr6gbq18IK9zV7DiPvzsz0oBJPme6qr6H6kGZuI9/DZg=="], @@ -562,14 +536,10 @@ "strnum": ["strnum@1.1.2", "", {}, "sha512-vrN+B7DBIoTTZjnPNewwhx6cBA/H+IS7rfW68n7XxC1y7uoiGQBxaKzqucGUgavX15dJgiGztLJ8vxuEzwqBdA=="], - "supports-color": ["supports-color@10.2.2", "", {}, "sha512-SS+jx45GF1QjgEXQx4NJZV9ImqmO2NPz5FNsIHrsDjh2YsHnawpan7SNQ1o8NuhrbHZy9AZhIoCUiCeaW/C80g=="], - "swagger-ui-dist": ["swagger-ui-dist@5.31.0", "", { "dependencies": { "@scarf/scarf": "=1.4.0" } }, "sha512-zSUTIck02fSga6rc0RZP3b7J7wgHXwLea8ZjgLA3Vgnb8QeOl3Wou2/j5QkzSGeoz6HusP/coYuJl33aQxQZpg=="], "swagger-ui-express": ["swagger-ui-express@5.0.1", "", { "dependencies": { "swagger-ui-dist": ">=5.0.0" }, "peerDependencies": { "express": ">=4.0.0 || >=5.0.0-beta" } }, "sha512-SrNU3RiBGTLLmFU8GIJdOdanJTl4TOmT27tt3bWWHppqYmAZ6IDuEuBvMU6nZq0zLEe6b/1rACXCgLZqO6ZfrA=="], - "tar": ["tar@7.5.7", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.1.0", "yallist": "^5.0.0" } }, "sha512-fov56fJiRuThVFXD6o6/Q354S7pnWMJIVlDBYijsTNx6jKSE4pvrDTs6lUnmGvNyfJwFQQwWy3owKz1ucIhveQ=="], - "through2": ["through2@4.0.2", "", { "dependencies": { "readable-stream": "3" } }, "sha512-iOqSav00cVxEEICeD7TjLB1sueEL+81Wpzp2bY17uZjZN0pWZPuo4suZ/61VujxmqSGFfgOcNuTZ85QJwNZQpw=="], "toidentifier": ["toidentifier@1.0.1", "", {}, "sha512-o5sSPKEkg/DIQNmH43V0/uerLrpzVedkUh8tGNvaeXpfpuwjKenlSox/2O/BTlZUtEe+JG7s5YhEz608PlAHRA=="], @@ -600,8 +570,6 @@ "web-encoding": ["web-encoding@1.1.5", "", { "dependencies": { "util": "^0.12.3" }, "optionalDependencies": { "@zxing/text-encoding": "0.9.0" } }, "sha512-HYLeVCdJ0+lBYV2FvNZmv3HJ2Nt0QYXqZojk3d9FJOLkwnuhzM9tmamh8d7HPM8QqjKH8DeHkFTx+CFlWpZZDA=="], - "web-streams-polyfill": ["web-streams-polyfill@3.3.3", "", {}, "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw=="], - "webidl-conversions": ["webidl-conversions@3.0.1", "", {}, "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ=="], "whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="], @@ -618,7 +586,7 @@ "xtend": ["xtend@4.0.2", "", {}, "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="], - "yallist": ["yallist@5.0.0", "", {}, "sha512-YgvUTfwqyc7UXVMrB+SImsVYSmTS8X/tSrtdNZMImM+n7+QTriRXyXim0mBrTXNeqzVF0KWGgHPeiyViFFrNDw=="], + "yallist": ["yallist@4.0.0", "", {}, "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="], "yaml": ["yaml@2.8.2", "", { "bin": { "yaml": "bin.mjs" } }, "sha512-mplynKqc1C2hTVYxd0PU2xQAc22TI1vShAYGksCCfxbn/dFwnHTNi1bvYsBTkhdUNtGIf5xNOg938rrSSYvS9A=="], @@ -638,12 +606,8 @@ "form-data/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], - "gaxios/node-fetch": ["node-fetch@2.7.0", "", { "dependencies": { "whatwg-url": "^5.0.0" }, "peerDependencies": { "encoding": "^0.1.0" }, "optionalPeers": ["encoding"] }, "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A=="], - "googleapis-common/uuid": ["uuid@8.3.2", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg=="], - "lru-cache/yallist": ["yallist@4.0.0", "", {}, "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="], - "minio/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], "proxy-addr/ipaddr.js": ["ipaddr.js@1.9.1", "", {}, "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g=="], diff --git a/Backend/media/sfu/mediasoup.ts b/Backend/media/sfu/mediasoup.ts deleted file mode 100644 index 5f3d502..0000000 --- a/Backend/media/sfu/mediasoup.ts +++ /dev/null @@ -1,405 +0,0 @@ -import { randomUUID } from 'crypto'; -import { networkInterfaces } from 'os'; - -import { mediaConfig } from '../config'; -import type { - SfuConnectTransportInput, - SfuConsumeInput, - SfuConsumerDescriptor, - SfuProduceInput, - SfuProducerDescriptor, - SfuPublishTransportRequest, - SfuPublishTransportResult, - SfuService, - SfuSessionDescriptor, - SfuSessionStartInput, - SfuSubscribeTransportRequest, - SfuSubscribeTransportResult, - SfuTransportDescriptor, - SfuTransportOptions, -} from './types'; - -type DynamicMediasoup = { - createWorker: (opts?: Record) => Promise; -}; - -type MediaSessionRuntime = { - session: SfuSessionDescriptor; - router: any; - transports: Map; - transportDescriptors: Map; - producers: Map; - producerDescriptors: Map; - consumers: Map; - consumerDescriptors: Map; -}; - -const parsePort = (value: string | undefined, fallback: number): number => { - const parsed = Number(value); - if (!Number.isFinite(parsed)) return fallback; - return parsed; -}; - -const pickHostIpv4 = (): string | null => { - const interfaces = networkInterfaces(); - for (const addresses of Object.values(interfaces)) { - if (!addresses) continue; - for (const address of addresses) { - if (address.family === 'IPv4' && !address.internal) { - return address.address; - } - } - } - return null; -}; - -const resolveListenAddress = (): { ip: string; announcedAddress?: string } => { - const configuredListenIp = (process.env.MEDIA_WEBRTC_LISTEN_IP ?? '').trim(); - const configuredAnnounced = process.env.MEDIA_WEBRTC_ANNOUNCED_IP?.trim(); - - if (configuredListenIp && configuredListenIp !== '0.0.0.0') { - return { - ip: configuredListenIp, - ...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}), - }; - } - - const discoveredIp = pickHostIpv4(); - if (!configuredAnnounced && configuredListenIp === '0.0.0.0') { - console.warn( - `[sfu] MEDIA_WEBRTC_LISTEN_IP is 0.0.0.0 without MEDIA_WEBRTC_ANNOUNCED_IP. ` + - `Using ${discoveredIp ?? '127.0.0.1'} for ICE candidates. Configure both env vars for production.`, - ); - } - - const ip = discoveredIp ?? '127.0.0.1'; - return { - ip, - ...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}), - }; -}; - -const toTransportOptions = (transport: any): SfuTransportOptions => ({ - id: transport.id, - iceParameters: transport.iceParameters ?? {}, - iceCandidates: transport.iceCandidates ?? [], - dtlsParameters: transport.dtlsParameters ?? {}, - ...(transport.sctpParameters ? { sctpParameters: transport.sctpParameters } : {}), -}); - -const mediasoupCodecs = [ - { - kind: 'audio', - mimeType: 'audio/opus', - clockRate: 48000, - channels: 2, - }, - { - kind: 'video', - mimeType: 'video/VP8', - clockRate: 90000, - parameters: {}, - }, -]; - -export class MediasoupSfuService implements SfuService { - mode: 'single_server_sfu' = 'single_server_sfu'; - private mediasoupPromise: Promise | null = null; - private workerPromise: Promise | null = null; - private readonly sessions = new Map(); - - private async getMediasoup(): Promise { - if (!this.mediasoupPromise) { - this.mediasoupPromise = (new Function('return import("mediasoup")')() as Promise).catch((error) => { - throw new Error(`mediasoup package is required for MEDIA_SFU_ENGINE=mediasoup: ${error instanceof Error ? error.message : 'load failed'}`); - }); - } - return await this.mediasoupPromise; - } - - private async getWorker(): Promise { - if (!this.workerPromise) { - this.workerPromise = (async () => { - const mediasoup = await this.getMediasoup(); - const worker = await mediasoup.createWorker({ - logLevel: (process.env.MEDIA_SFU_LOG_LEVEL ?? 'warn') as 'debug' | 'warn' | 'error' | 'none', - rtcMinPort: parsePort(process.env.MEDIA_RTC_MIN_PORT, 40000), - rtcMaxPort: parsePort(process.env.MEDIA_RTC_MAX_PORT, 49999), - }); - worker.on?.('died', () => { - console.error('mediasoup worker died; clearing worker handle'); - this.workerPromise = null; - }); - return worker; - })(); - } - return await this.workerPromise; - } - - private getRuntime(streamSessionId: string): MediaSessionRuntime { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) { - throw new Error('SFU session not initialized'); - } - return runtime; - } - - private async createWebRtcTransport(router: any): Promise { - const listenAddress = resolveListenAddress(); - return await router.createWebRtcTransport({ - listenInfos: [ - { - protocol: 'udp', - ip: listenAddress.ip, - ...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}), - }, - { - protocol: 'tcp', - ip: listenAddress.ip, - ...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}), - }, - ], - enableUdp: true, - enableTcp: true, - preferUdp: true, - }); - } - - async startSession(input: SfuSessionStartInput): Promise { - const existing = this.sessions.get(input.streamSessionId); - if (existing) { - return existing.session; - } - - const worker = await this.getWorker(); - const router = await worker.createRouter({ mediaCodecs: mediasoupCodecs }); - const session: SfuSessionDescriptor = { - streamSessionId: input.streamSessionId, - ownerUserId: input.ownerUserId, - cameraDeviceId: input.cameraDeviceId, - requesterDeviceId: input.requesterDeviceId, - state: 'starting', - createdAt: new Date().toISOString(), - }; - - this.sessions.set(input.streamSessionId, { - session, - router, - transports: new Map(), - transportDescriptors: new Map(), - producers: new Map(), - producerDescriptors: new Map(), - consumers: new Map(), - consumerDescriptors: new Map(), - }); - return session; - } - - async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise { - const runtime = this.getRuntime(streamSessionId); - runtime.session = { ...runtime.session, state }; - } - - async endSession(streamSessionId: string): Promise { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) return; - - runtime.session = { ...runtime.session, state: 'ending' }; - for (const consumer of runtime.consumers.values()) { - consumer.close?.(); - } - for (const producer of runtime.producers.values()) { - producer.close?.(); - } - for (const transport of runtime.transports.values()) { - transport.close?.(); - } - runtime.router.close?.(); - runtime.session = { ...runtime.session, state: 'ended' }; - this.sessions.delete(streamSessionId); - } - - async getSession(streamSessionId: string): Promise { - return this.sessions.get(streamSessionId)?.session ?? null; - } - - async getRouterRtpCapabilities(streamSessionId: string): Promise | null> { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) return null; - return runtime.router.rtpCapabilities ?? null; - } - - async listSessions(): Promise { - return Array.from(this.sessions.values()).map((runtime) => runtime.session); - } - - async listTransports(streamSessionId: string): Promise { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) return []; - return Array.from(runtime.transportDescriptors.values()); - } - - async listProducers(streamSessionId: string): Promise { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) return []; - return Array.from(runtime.producerDescriptors.values()); - } - - async listConsumers(streamSessionId: string): Promise { - const runtime = this.sessions.get(streamSessionId); - if (!runtime) return []; - return Array.from(runtime.consumerDescriptors.values()); - } - - async createPublishTransport(input: SfuPublishTransportRequest): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = await this.createWebRtcTransport(runtime.router); - const descriptor: SfuTransportDescriptor = { - transportId: transport.id, - streamSessionId: input.streamSessionId, - ownerDeviceId: input.cameraDeviceId, - direction: 'publish', - state: 'new', - createdAt: new Date().toISOString(), - }; - runtime.transports.set(transport.id, transport); - runtime.transportDescriptors.set(transport.id, descriptor); - return { - transportId: transport.id, - iceServers: mediaConfig.turn.urls.map((urls) => ({ - urls, - ...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}), - ...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}), - })), - transportOptions: toTransportOptions(transport), - }; - } - - async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = await this.createWebRtcTransport(runtime.router); - const descriptor: SfuTransportDescriptor = { - transportId: transport.id, - streamSessionId: input.streamSessionId, - ownerDeviceId: input.viewerDeviceId, - direction: 'subscribe', - state: 'new', - createdAt: new Date().toISOString(), - }; - runtime.transports.set(transport.id, transport); - runtime.transportDescriptors.set(transport.id, descriptor); - return { - transportId: transport.id, - iceServers: mediaConfig.turn.urls.map((urls) => ({ - urls, - ...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}), - ...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}), - })), - transportOptions: toTransportOptions(transport), - }; - } - - async connectPublishTransport(input: SfuConnectTransportInput): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = runtime.transports.get(input.transportId); - const descriptor = runtime.transportDescriptors.get(input.transportId); - if (!transport || !descriptor) throw new Error('Publish transport not found'); - if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport'); - if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport'); - - await transport.connect({ dtlsParameters: input.dtlsParameters }); - const next = { ...descriptor, state: 'connected' as const }; - runtime.transportDescriptors.set(descriptor.transportId, next); - return next; - } - - async connectSubscribeTransport(input: SfuConnectTransportInput): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = runtime.transports.get(input.transportId); - const descriptor = runtime.transportDescriptors.get(input.transportId); - if (!transport || !descriptor) throw new Error('Subscribe transport not found'); - if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); - if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport'); - - await transport.connect({ dtlsParameters: input.dtlsParameters }); - const next = { ...descriptor, state: 'connected' as const }; - runtime.transportDescriptors.set(descriptor.transportId, next); - return next; - } - - async produce(input: SfuProduceInput): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = runtime.transports.get(input.transportId); - const descriptor = runtime.transportDescriptors.get(input.transportId); - if (!transport || !descriptor) throw new Error('Publish transport not found'); - if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport'); - if (descriptor.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport'); - if (descriptor.state !== 'connected') throw new Error('Publish transport must be connected before producing'); - - const producer = await transport.produce({ - kind: input.kind, - rtpParameters: input.rtpParameters, - appData: { cameraDeviceId: input.cameraDeviceId }, - }); - const producerId = producer.id ?? `prod_${randomUUID()}`; - const producerDescriptor: SfuProducerDescriptor = { - producerId, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - cameraDeviceId: input.cameraDeviceId, - kind: input.kind, - rtpParameters: input.rtpParameters, - createdAt: new Date().toISOString(), - }; - runtime.producers.set(producerId, producer); - runtime.producerDescriptors.set(producerId, producerDescriptor); - return producerDescriptor; - } - - async consume(input: SfuConsumeInput): Promise { - const runtime = this.getRuntime(input.streamSessionId); - const transport = runtime.transports.get(input.transportId); - const descriptor = runtime.transportDescriptors.get(input.transportId); - if (!transport || !descriptor) throw new Error('Subscribe transport not found'); - if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); - if (descriptor.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport'); - if (descriptor.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming'); - - const producerId = - input.producerId ?? - Array.from(runtime.producerDescriptors.values()) - .slice() - .reverse() - .find((producer) => producer.kind === 'video')?.producerId; - if (!producerId) throw new Error('No producer available for consume'); - - const producer = runtime.producers.get(producerId); - const producerDescriptor = runtime.producerDescriptors.get(producerId); - if (!producer || !producerDescriptor) throw new Error('Producer not found'); - if (!runtime.router.canConsume({ producerId: producer.id ?? producerId, rtpCapabilities: input.rtpCapabilities ?? {} })) { - throw new Error('Router cannot consume with provided RTP capabilities'); - } - - const consumer = await transport.consume({ - producerId: producer.id ?? producerId, - rtpCapabilities: input.rtpCapabilities ?? {}, - paused: false, - appData: { viewerDeviceId: input.viewerDeviceId }, - }); - - const consumerId = consumer.id ?? `cons_${randomUUID()}`; - const consumerDescriptor: SfuConsumerDescriptor = { - consumerId, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - viewerDeviceId: input.viewerDeviceId, - producerId, - kind: producerDescriptor.kind, - rtpParameters: consumer.rtpParameters ?? producerDescriptor.rtpParameters, - createdAt: new Date().toISOString(), - }; - runtime.consumers.set(consumerId, consumer); - runtime.consumerDescriptors.set(consumerId, consumerDescriptor); - return consumerDescriptor; - } -} diff --git a/Backend/media/sfu/noop.ts b/Backend/media/sfu/noop.ts index 8643f12..ff449a5 100644 --- a/Backend/media/sfu/noop.ts +++ b/Backend/media/sfu/noop.ts @@ -3,12 +3,6 @@ import { randomUUID } from 'crypto'; import { mediaConfig } from '../config'; import { SfuSessionRegistry } from './registry'; import type { - SfuConnectTransportInput, - SfuConsumeInput, - SfuConsumerDescriptor, - SfuIceServer, - SfuProduceInput, - SfuProducerDescriptor, SfuPublishTransportRequest, SfuPublishTransportResult, SfuService, @@ -16,10 +10,9 @@ import type { SfuSessionStartInput, SfuSubscribeTransportRequest, SfuSubscribeTransportResult, - SfuTransportDescriptor, } from './types'; -const toIceServers = (): SfuIceServer[] => { +const toIceServers = (): Array<{ urls: string; username?: string; credential?: string }> => { if (mediaConfig.turn.urls.length === 0) { return []; } @@ -64,138 +57,21 @@ export class NoopSfuService implements SfuService { return this.registry.get(streamSessionId); } - async getRouterRtpCapabilities(_streamSessionId: string): Promise | null> { - return { - codecs: [{ mimeType: 'video/VP8', clockRate: 90000, kind: 'video' }], - headerExtensions: [], - }; - } - async listSessions(): Promise { return this.registry.list(); } - async listTransports(streamSessionId: string): Promise { - return this.registry.listTransports(streamSessionId); - } - - async listProducers(streamSessionId: string): Promise { - return this.registry.listProducers(streamSessionId); - } - - async listConsumers(streamSessionId: string): Promise { - return this.registry.listConsumers(streamSessionId); - } - - async createPublishTransport(input: SfuPublishTransportRequest): Promise { - const transportId = `pub_${randomUUID()}`; - this.registry.addTransport({ - transportId, - streamSessionId: input.streamSessionId, - ownerDeviceId: input.cameraDeviceId, - direction: 'publish', - }); + async createPublishTransport(_input: SfuPublishTransportRequest): Promise { return { - transportId, + transportId: `pub_${randomUUID()}`, iceServers: toIceServers(), - transportOptions: { - id: transportId, - iceParameters: {}, - iceCandidates: [], - dtlsParameters: {}, - }, }; } - async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise { - const transportId = `sub_${randomUUID()}`; - this.registry.addTransport({ - transportId, - streamSessionId: input.streamSessionId, - ownerDeviceId: input.viewerDeviceId, - direction: 'subscribe', - }); + async createSubscribeTransport(_input: SfuSubscribeTransportRequest): Promise { return { - transportId, + transportId: `sub_${randomUUID()}`, iceServers: toIceServers(), - transportOptions: { - id: transportId, - iceParameters: {}, - iceCandidates: [], - dtlsParameters: {}, - }, }; } - - async connectPublishTransport(input: SfuConnectTransportInput): Promise { - const transport = this.registry.getTransport(input.transportId); - if (!transport) throw new Error('Publish transport not found'); - if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); - if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport'); - if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport'); - - const connected = this.registry.connectTransport(input.transportId); - if (!connected) throw new Error('Publish transport connect failed'); - return connected; - } - - async connectSubscribeTransport(input: SfuConnectTransportInput): Promise { - const transport = this.registry.getTransport(input.transportId); - if (!transport) throw new Error('Subscribe transport not found'); - if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); - if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); - if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport'); - - const connected = this.registry.connectTransport(input.transportId); - if (!connected) throw new Error('Subscribe transport connect failed'); - return connected; - } - - async produce(input: SfuProduceInput): Promise { - const transport = this.registry.getTransport(input.transportId); - if (!transport) throw new Error('Publish transport not found'); - if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); - if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport'); - if (transport.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport'); - if (transport.state !== 'connected') throw new Error('Publish transport must be connected before producing'); - - return this.registry.addProducer({ - producerId: `prod_${randomUUID()}`, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - cameraDeviceId: input.cameraDeviceId, - kind: input.kind, - rtpParameters: input.rtpParameters, - }); - } - - async consume(input: SfuConsumeInput): Promise { - const transport = this.registry.getTransport(input.transportId); - if (!transport) throw new Error('Subscribe transport not found'); - if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); - if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); - if (transport.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport'); - if (transport.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming'); - - const selectedProducer = - (input.producerId ? this.registry.getProducer(input.producerId) : null) ?? - this.registry - .listProducers(input.streamSessionId) - .slice() - .reverse() - .find((producer) => producer.kind === 'video'); - - if (!selectedProducer) throw new Error('No producer available for consume'); - if (selectedProducer.streamSessionId !== input.streamSessionId) throw new Error('Producer does not belong to stream'); - - return this.registry.addConsumer({ - consumerId: `cons_${randomUUID()}`, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - viewerDeviceId: input.viewerDeviceId, - producerId: selectedProducer.producerId, - kind: selectedProducer.kind, - rtpParameters: selectedProducer.rtpParameters, - }); - } } diff --git a/Backend/media/sfu/registry.ts b/Backend/media/sfu/registry.ts index 86a570d..776477c 100644 --- a/Backend/media/sfu/registry.ts +++ b/Backend/media/sfu/registry.ts @@ -1,24 +1,11 @@ -import type { - SfuConsumerDescriptor, - SfuMediaKind, - SfuProducerDescriptor, - SfuSessionDescriptor, - SfuSessionState, - SfuTransportDescriptor, - SfuTransportDirection, -} from './types'; +import type { SfuSessionDescriptor, SfuSessionState } from './types'; type StoredSfuSession = SfuSessionDescriptor & { updatedAt: string; }; -const nowIso = (): string => new Date().toISOString(); - export class SfuSessionRegistry { private readonly sessions = new Map(); - private readonly transports = new Map(); - private readonly producers = new Map(); - private readonly consumers = new Map(); get(streamSessionId: string): SfuSessionDescriptor | null { const found = this.sessions.get(streamSessionId); @@ -28,7 +15,8 @@ export class SfuSessionRegistry { } set(session: SfuSessionDescriptor): SfuSessionDescriptor { - this.sessions.set(session.streamSessionId, { ...session, updatedAt: nowIso() }); + const now = new Date().toISOString(); + this.sessions.set(session.streamSessionId, { ...session, updatedAt: now }); return session; } @@ -36,7 +24,11 @@ export class SfuSessionRegistry { const existing = this.sessions.get(streamSessionId); if (!existing) return null; - const next: StoredSfuSession = { ...existing, state, updatedAt: nowIso() }; + const next: StoredSfuSession = { + ...existing, + state, + updatedAt: new Date().toISOString(), + }; this.sessions.set(streamSessionId, next); const { updatedAt: _updatedAt, ...descriptor } = next; return descriptor; @@ -45,97 +37,5 @@ export class SfuSessionRegistry { list(): SfuSessionDescriptor[] { return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor); } - - addTransport(input: { - transportId: string; - streamSessionId: string; - ownerDeviceId: string; - direction: SfuTransportDirection; - }): SfuTransportDescriptor { - const descriptor: SfuTransportDescriptor = { - transportId: input.transportId, - streamSessionId: input.streamSessionId, - ownerDeviceId: input.ownerDeviceId, - direction: input.direction, - state: 'new', - createdAt: nowIso(), - }; - this.transports.set(descriptor.transportId, descriptor); - return descriptor; - } - - getTransport(transportId: string): SfuTransportDescriptor | null { - return this.transports.get(transportId) ?? null; - } - - listTransports(streamSessionId: string): SfuTransportDescriptor[] { - return Array.from(this.transports.values()).filter((transport) => transport.streamSessionId === streamSessionId); - } - - connectTransport(transportId: string): SfuTransportDescriptor | null { - const existing = this.transports.get(transportId); - if (!existing) return null; - const next: SfuTransportDescriptor = { ...existing, state: 'connected' }; - this.transports.set(transportId, next); - return next; - } - - addProducer(input: { - producerId: string; - streamSessionId: string; - transportId: string; - cameraDeviceId: string; - kind: SfuMediaKind; - rtpParameters: Record; - }): SfuProducerDescriptor { - const descriptor: SfuProducerDescriptor = { - producerId: input.producerId, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - cameraDeviceId: input.cameraDeviceId, - kind: input.kind, - rtpParameters: input.rtpParameters, - createdAt: nowIso(), - }; - this.producers.set(descriptor.producerId, descriptor); - return descriptor; - } - - getProducer(producerId: string): SfuProducerDescriptor | null { - return this.producers.get(producerId) ?? null; - } - - listProducers(streamSessionId: string): SfuProducerDescriptor[] { - return Array.from(this.producers.values()) - .filter((producer) => producer.streamSessionId === streamSessionId) - .sort((left, right) => left.createdAt.localeCompare(right.createdAt)); - } - - addConsumer(input: { - consumerId: string; - streamSessionId: string; - transportId: string; - viewerDeviceId: string; - producerId: string; - kind: SfuMediaKind; - rtpParameters: Record; - }): SfuConsumerDescriptor { - const descriptor: SfuConsumerDescriptor = { - consumerId: input.consumerId, - streamSessionId: input.streamSessionId, - transportId: input.transportId, - viewerDeviceId: input.viewerDeviceId, - producerId: input.producerId, - kind: input.kind, - rtpParameters: input.rtpParameters, - createdAt: nowIso(), - }; - this.consumers.set(descriptor.consumerId, descriptor); - return descriptor; - } - - listConsumers(streamSessionId: string): SfuConsumerDescriptor[] { - return Array.from(this.consumers.values()).filter((consumer) => consumer.streamSessionId === streamSessionId); - } } diff --git a/Backend/media/sfu/service.ts b/Backend/media/sfu/service.ts index 0652638..2dec3c6 100644 --- a/Backend/media/sfu/service.ts +++ b/Backend/media/sfu/service.ts @@ -1,20 +1,14 @@ import { mediaMode } from '../config'; -import { MediasoupSfuService } from './mediasoup'; import { NoopSfuService } from './noop'; import type { SfuService } from './types'; -const sfuEngine = (process.env.MEDIA_SFU_ENGINE ?? 'mediasoup').trim().toLowerCase(); - const createSfuService = (): SfuService | null => { if (mediaMode !== 'single_server_sfu') { return null; } - if (sfuEngine === 'noop') { - return new NoopSfuService(); - } - - return new MediasoupSfuService(); + return new NoopSfuService(); }; export const sfuService = createSfuService(); + diff --git a/Backend/media/sfu/types.ts b/Backend/media/sfu/types.ts index 70575bf..dfb1986 100644 --- a/Backend/media/sfu/types.ts +++ b/Backend/media/sfu/types.ts @@ -1,21 +1,4 @@ export type SfuSessionState = 'idle' | 'starting' | 'live' | 'ending' | 'ended'; -export type SfuTransportDirection = 'publish' | 'subscribe'; -export type SfuTransportState = 'new' | 'connected'; -export type SfuMediaKind = 'audio' | 'video'; - -export type SfuIceServer = { - urls: string; - username?: string; - credential?: string; -}; - -export type SfuTransportOptions = { - id: string; - iceParameters: Record; - iceCandidates: Array>; - dtlsParameters: Record; - sctpParameters?: Record; -}; export type SfuSessionDescriptor = { streamSessionId: string; @@ -40,8 +23,7 @@ export type SfuPublishTransportRequest = { export type SfuPublishTransportResult = { transportId: string; - iceServers: SfuIceServer[]; - transportOptions?: SfuTransportOptions; + iceServers: Array<{ urls: string; username?: string; credential?: string }>; }; export type SfuSubscribeTransportRequest = { @@ -51,61 +33,7 @@ export type SfuSubscribeTransportRequest = { export type SfuSubscribeTransportResult = { transportId: string; - iceServers: SfuIceServer[]; - transportOptions?: SfuTransportOptions; -}; - -export type SfuTransportDescriptor = { - transportId: string; - streamSessionId: string; - ownerDeviceId: string; - direction: SfuTransportDirection; - state: SfuTransportState; - createdAt: string; -}; - -export type SfuProducerDescriptor = { - producerId: string; - streamSessionId: string; - transportId: string; - cameraDeviceId: string; - kind: SfuMediaKind; - rtpParameters: Record; - createdAt: string; -}; - -export type SfuConsumerDescriptor = { - consumerId: string; - streamSessionId: string; - transportId: string; - viewerDeviceId: string; - producerId: string; - kind: SfuMediaKind; - rtpParameters: Record; - createdAt: string; -}; - -export type SfuConnectTransportInput = { - streamSessionId: string; - transportId: string; - deviceId: string; - dtlsParameters: Record; -}; - -export type SfuProduceInput = { - streamSessionId: string; - transportId: string; - cameraDeviceId: string; - kind: SfuMediaKind; - rtpParameters: Record; -}; - -export type SfuConsumeInput = { - streamSessionId: string; - transportId: string; - viewerDeviceId: string; - producerId?: string; - rtpCapabilities?: Record; + iceServers: Array<{ urls: string; username?: string; credential?: string }>; }; export interface SfuService { @@ -114,15 +42,7 @@ export interface SfuService { setSessionState(streamSessionId: string, state: SfuSessionState): Promise; endSession(streamSessionId: string): Promise; getSession(streamSessionId: string): Promise; - getRouterRtpCapabilities(streamSessionId: string): Promise | null>; listSessions(): Promise; - listTransports(streamSessionId: string): Promise; - listProducers(streamSessionId: string): Promise; - listConsumers(streamSessionId: string): Promise; createPublishTransport(input: SfuPublishTransportRequest): Promise; createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise; - connectPublishTransport(input: SfuConnectTransportInput): Promise; - connectSubscribeTransport(input: SfuConnectTransportInput): Promise; - produce(input: SfuProduceInput): Promise; - consume(input: SfuConsumeInput): Promise; } diff --git a/Backend/package.json b/Backend/package.json index 4fdea20..1928bb5 100644 --- a/Backend/package.json +++ b/Backend/package.json @@ -28,7 +28,6 @@ "drizzle-orm": "^0.44.0", "express": "^5.2.1", "helmet": "^8.1.0", - "mediasoup": "^3.15.6", "minio": "^8.0.6", "openai": "^6.18.0", "pg": "^8.18.0", @@ -47,8 +46,5 @@ "db:push": "drizzle-kit push", "db:studio": "drizzle-kit studio", "auth:migrate": "bun run scripts/migrate-better-auth.ts" - }, - "trustedDependencies": [ - "mediasoup" - ] + } } diff --git a/Backend/public/mobile-sim.js b/Backend/public/mobile-sim.js index 00da2f1..994873c 100644 --- a/Backend/public/mobile-sim.js +++ b/Backend/public/mobile-sim.js @@ -138,21 +138,9 @@ const API = { }, ops: { - ready: () => API.request('/ops/ready'), listRecordings: () => API.request('/recordings/me/list'), getRecordingDownloadUrl: (recordingId) => API.request(`/recordings/${recordingId}/download-url`), listNotifications: () => API.request('/push-notifications/me'), - }, - - sfu: { - getRouterRtpCapabilities: (id) => API.request(`/streams/${id}/sfu/router-rtp-capabilities`), - getSession: (id) => API.request(`/streams/${id}/sfu/session`), - createPublishTransport: (id) => API.request(`/streams/${id}/sfu/publish-transport`, { method: 'POST', body: JSON.stringify({ role: 'camera' }) }), - connectPublishTransport: (id, payload) => API.request(`/streams/${id}/sfu/publish-transport/connect`, { method: 'POST', body: JSON.stringify(payload) }), - createSubscribeTransport: (id) => API.request(`/streams/${id}/sfu/subscribe-transport`, { method: 'POST', body: JSON.stringify({ role: 'viewer' }) }), - connectSubscribeTransport: (id, payload) => API.request(`/streams/${id}/sfu/subscribe-transport/connect`, { method: 'POST', body: JSON.stringify(payload) }), - produce: (id, payload) => API.request(`/streams/${id}/sfu/produce`, { method: 'POST', body: JSON.stringify(payload) }), - consume: (id, payload) => API.request(`/streams/${id}/sfu/consume`, { method: 'POST', body: JSON.stringify(payload) }), } }; @@ -178,97 +166,10 @@ let webrtcConnected = false; let hasWebrtcEverConnected = false; let lastPeerConnectionState = null; let pendingRemoteCandidates = []; -let mediaMode = 'legacy'; -let mediasoupClientModulePromise = null; -let sfuDevice = null; -let sfuDeviceStreamSessionId = null; -let sfuSendTransport = null; -let sfuRecvTransport = null; -let sfuPublishedProducerId = null; -let sfuConsumedTrack = null; -let streamRequestInFlight = false; -let hasAutoRequestedInitialStream = false; -const inflightCameraStreamCommands = new Set(); const rtcConfig = { iceServers: [{ urls: 'stun:stun.l.google.com:19302' }], }; -const isSfuMode = () => mediaMode === 'single_server_sfu'; - -const detectMediaMode = async () => { - try { - const ready = await API.ops.ready(); - const mode = ready?.checks?.mediaMode; - if (mode === 'single_server_sfu' || mode === 'legacy') { - mediaMode = mode; - addActivity('Media', `Mode: ${mediaMode}`); - } - } catch { - mediaMode = 'legacy'; - } -}; - -const loadMediasoupClientModule = async () => { - if (!mediasoupClientModulePromise) { - mediasoupClientModulePromise = import('https://cdn.jsdelivr.net/npm/mediasoup-client@3/+esm'); - } - return await mediasoupClientModulePromise; -}; - -const closeSfuTransports = () => { - try { - sfuSendTransport?.close?.(); - } catch { } - try { - sfuRecvTransport?.close?.(); - } catch { } - - if (sfuConsumedTrack) { - try { - sfuConsumedTrack.stop(); - } catch { } - } - - sfuSendTransport = null; - sfuRecvTransport = null; - sfuPublishedProducerId = null; - sfuConsumedTrack = null; -}; - -const resetSfuRuntime = () => { - closeSfuTransports(); - sfuDevice = null; - sfuDeviceStreamSessionId = null; -}; - -const getMediasoupDeviceClass = async () => { - const module = await loadMediasoupClientModule(); - return module.Device || module.default?.Device || module.default; -}; - -const ensureSfuDevice = async (streamSessionId) => { - if (sfuDevice && sfuDeviceStreamSessionId === streamSessionId) { - return sfuDevice; - } - - resetSfuRuntime(); - - const DeviceClass = await getMediasoupDeviceClass(); - if (!DeviceClass) { - throw new Error('mediasoup-client Device class unavailable'); - } - const device = new DeviceClass(); - const caps = await API.sfu.getRouterRtpCapabilities(streamSessionId); - if (!caps?.routerRtpCapabilities) { - throw new Error('Router RTP capabilities not available'); - } - - await device.load({ routerRtpCapabilities: caps.routerRtpCapabilities }); - sfuDevice = device; - sfuDeviceStreamSessionId = streamSessionId; - return device; -}; - const init = async () => { // Load local storage const saved = localStorage.getItem('mobileSimDevice'); @@ -283,7 +184,6 @@ const init = async () => { const session = await API.auth.getSession(); if (session && session.session) { store.update({ session }); - await detectMediaMode(); if (store.get().deviceToken) { // If we have a token, skip onboarding navigateBasedOnRole(); @@ -613,7 +513,6 @@ const teardownPeerConnection = () => { if (previousSessionId) { pendingRemoteCandidates = pendingRemoteCandidates.filter((item) => item.streamSessionId !== previousSessionId); } - closeSfuTransports(); clearClientStream(); }; @@ -761,146 +660,6 @@ const startOfferToClient = async (streamSessionId, requesterDeviceId) => { }); }; -const startSfuPublishHandshake = async (streamSessionId) => { - const ready = await startCameraPreview(); - if (!ready || !localCameraStream) { - throw new Error('Camera stream unavailable for SFU publish'); - } - - const device = await ensureSfuDevice(streamSessionId); - const publishTransport = await API.sfu.createPublishTransport(streamSessionId); - const transportMeta = publishTransport?.transport; - const transportId = transportMeta?.transportId; - const transportOptions = transportMeta?.transportOptions; - if (!transportId || !transportOptions) { - throw new Error('Missing SFU publish transport parameters'); - } - - closeSfuTransports(); - const sendTransport = device.createSendTransport(transportOptions); - sfuSendTransport = sendTransport; - - sendTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { - try { - await API.sfu.connectPublishTransport(streamSessionId, { - transportId, - dtlsParameters, - }); - callback(); - } catch (error) { - errback(error); - } - }); - - sendTransport.on('produce', async ({ kind, rtpParameters }, callback, errback) => { - try { - const produced = await API.sfu.produce(streamSessionId, { - transportId, - kind, - rtpParameters, - }); - const producerId = produced?.producer?.producerId; - if (!producerId) { - throw new Error('SFU producer id missing'); - } - sfuPublishedProducerId = producerId; - callback({ id: producerId }); - } catch (error) { - errback(error); - } - }); - - const track = localCameraStream.getVideoTracks()[0]; - if (!track) { - throw new Error('No local video track available for SFU publish'); - } - await sendTransport.produce({ track }); - addActivity('SFU', 'Publish handshake completed (mediasoup)'); -}; - -const setClientSfuConnectedState = () => { - if (remoteStreamWaitTimer) { - clearTimeout(remoteStreamWaitTimer); - remoteStreamWaitTimer = null; - } - if (!remoteClientStream) { - setClientStreamPlaceholderText('Connected via SFU'); - setClientStreamMode('connecting'); - } -}; - -const startSfuSubscribeHandshake = async (streamSessionId) => { - const device = await ensureSfuDevice(streamSessionId); - const subscribeTransport = await API.sfu.createSubscribeTransport(streamSessionId); - const transportMeta = subscribeTransport?.transport; - const transportId = transportMeta?.transportId; - const transportOptions = transportMeta?.transportOptions; - if (!transportId || !transportOptions) { - throw new Error('Missing SFU subscribe transport parameters'); - } - - if (sfuRecvTransport) { - try { - sfuRecvTransport.close?.(); - } catch { } - } - - const recvTransport = device.createRecvTransport(transportOptions); - sfuRecvTransport = recvTransport; - - recvTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { - try { - await API.sfu.connectSubscribeTransport(streamSessionId, { - transportId, - dtlsParameters, - }); - callback(); - } catch (error) { - errback(error); - } - }); - - let consumeResult = null; - for (let attempt = 0; attempt < 8; attempt += 1) { - try { - consumeResult = await API.sfu.consume(streamSessionId, { - transportId, - rtpCapabilities: device.rtpCapabilities, - }); - if (consumeResult?.consumer?.consumerId) { - break; - } - } catch { - // Camera may still be establishing producer; retry briefly. - } - await sleep(350); - } - - if (!consumeResult?.consumer?.consumerId) { - throw new Error('SFU consumer was not created'); - } - - const consumerMeta = consumeResult.consumer; - const consumer = await recvTransport.consume({ - id: consumerMeta.consumerId, - producerId: consumerMeta.producerId, - kind: consumerMeta.kind, - rtpParameters: consumerMeta.rtpParameters, - }); - sfuConsumedTrack = consumer.track; - const stream = new MediaStream([consumer.track]); - remoteClientStream = stream; - const videoEl = $('clientStreamVideo'); - if (videoEl) { - videoEl.srcObject = stream; - setClientStreamMode('video'); - void videoEl.play().catch(() => {}); - } - - setClientSfuConnectedState(); - addActivity('SFU', `Subscribed to producer ${consumeResult.consumer.producerId} (mediasoup)`); -}; - const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); const finalizeRecordingForStream = async (streamSessionId, captureResult) => { @@ -917,14 +676,7 @@ const finalizeRecordingForStream = async (streamSessionId, captureResult) => { if (recording?.id) { try { if (!captureResult?.blob || captureResult.blob.size === 0) { - const fallbackObjectKey = `sim/${streamSessionId}/${Date.now()}.webm`; - await API.events.finalizeRecording(recording.id, { - objectKey: fallbackObjectKey, - durationSeconds: captureResult?.durationSeconds ?? 15, - sizeBytes: captureResult?.blob?.size ?? 0, - }); - addActivity('Recording', 'No local blob; finalized with simulator fallback'); - return true; + throw new Error('No captured video blob to upload'); } const uploadMeta = await API.request('/videos/upload-url', { @@ -986,7 +738,6 @@ const connectSocket = () => { socket.on('connect', () => { store.update({ socketConnected: true }); addActivity('System', 'Connected to realtime server'); - void detectMediaMode(); if (store.get().device?.role === 'camera') { startCameraPreview(); } @@ -997,7 +748,6 @@ const connectSocket = () => { stopFrameRelay(); void stopLocalRecording(); teardownPeerConnection(); - resetSfuRuntime(); store.update({ activeCameraDeviceId: null, activeStreamSessionId: null }); }); @@ -1008,31 +758,14 @@ const connectSocket = () => { try { if (payload.commandType === 'start_stream') { const streamId = payload.payload.streamSessionId; - if (inflightCameraStreamCommands.has(streamId)) { - addActivity('Stream', `Duplicate start ignored for ${streamId.substring(0, 8)}`); - socket.emit('command:ack', { commandId: payload.commandId, status: 'acknowledged' }); - return; - } - inflightCameraStreamCommands.add(streamId); - const ready = await startCameraPreview(); if (!ready) { throw new Error('Camera permission is required before streaming'); } - try { - await API.streams.accept(streamId); - } catch (error) { - const message = error?.message || ''; - if (!message.includes('status 409')) { - throw error; - } - addActivity('Stream', 'Accept already handled, continuing publish setup'); - } + await API.streams.accept(streamId); await API.streams.getPublishCreds(streamId); await startLocalRecording(); - if (isSfuMode()) { - await startSfuPublishHandshake(streamId); - } else if (payload.sourceDeviceId) { + if (payload.sourceDeviceId) { await startOfferToClient(streamId, payload.sourceDeviceId); frameRelayStartTimer = setTimeout(() => { if (!webrtcConnected && !hasWebrtcEverConnected) { @@ -1043,32 +776,25 @@ const connectSocket = () => { addActivity('Stream', 'Accepted & Published'); // Auto-stop after 15s for simulation setTimeout(async () => { - try { - const captureResult = await stopLocalRecording(); - await API.streams.end(streamId); - await finalizeRecordingForStream(streamId, captureResult); - stopFrameRelay(); - if (!isSfuMode() && socket && payload.sourceDeviceId) { - socket.emit('webrtc:signal', { - toDeviceId: payload.sourceDeviceId, - streamSessionId: streamId, - signalType: 'hangup', - }); - } - teardownPeerConnection(); - store.update({ activeCameraDeviceId: null, activeStreamSessionId: null }); - addActivity('Stream', 'Ended auto-simulation'); - } finally { - inflightCameraStreamCommands.delete(streamId); + const captureResult = await stopLocalRecording(); + await API.streams.end(streamId); + await finalizeRecordingForStream(streamId, captureResult); + stopFrameRelay(); + if (socket && payload.sourceDeviceId) { + socket.emit('webrtc:signal', { + toDeviceId: payload.sourceDeviceId, + streamSessionId: streamId, + signalType: 'hangup', + }); } + teardownPeerConnection(); + store.update({ activeCameraDeviceId: null, activeStreamSessionId: null }); + addActivity('Stream', 'Ended auto-simulation'); }, 15000); } socket.emit('command:ack', { commandId: payload.commandId, status: 'acknowledged' }); } catch (e) { - if (payload?.payload?.streamSessionId) { - inflightCameraStreamCommands.delete(payload.payload.streamSessionId); - } socket.emit('command:ack', { commandId: payload.commandId, status: 'rejected', error: e.message }); } }); @@ -1090,27 +816,19 @@ const connectSocket = () => { }); try { await API.streams.getSubscribeCreds(payload.streamSessionId); - if (isSfuMode()) { - await startSfuSubscribeHandshake(payload.streamSessionId); - } Toast.show('Connected to Stream', 'success'); - if (!isSfuMode()) { - remoteStreamWaitTimer = setTimeout(() => { - if (!remoteClientStream) { - Toast.show('Stream connected but no video received', 'error'); - addActivity('Stream', 'No remote video track received'); - } - }, 6000); - } + remoteStreamWaitTimer = setTimeout(() => { + if (!remoteClientStream) { + Toast.show('Stream connected but no video received', 'error'); + addActivity('Stream', 'No remote video track received'); + } + }, 6000); } catch (e) { - const message = e?.message || 'Stream connect failed'; - addActivity('SFU', message); Toast.show('Stream connect failed', 'error'); } }); socket.on('stream:frame', (payload) => { - if (isSfuMode()) return; if (webrtcConnected) return; if (!payload?.frame) return; if (remoteStreamWaitTimer) { @@ -1131,13 +849,11 @@ const connectSocket = () => { socket.on('stream:ended', (payload) => { if (payload?.streamSessionId && payload.streamSessionId === store.get().activeStreamSessionId) { clearClientStream(); - closeSfuTransports(); store.update({ activeCameraDeviceId: null, activeStreamSessionId: null }); } }); socket.on('webrtc:signal', async (payload) => { - if (isSfuMode()) return; const device = store.get().device; if (!device || !payload?.streamSessionId || !payload?.signalType || !payload?.fromDeviceId) return; @@ -1270,7 +986,6 @@ const Actions = { await API.auth.signIn({ email, password }); const session = await API.auth.getSession(); store.update({ session }); - await detectMediaMode(); Toast.show(`Welcome, ${session.user.name}`, 'success'); // Proceed @@ -1323,7 +1038,6 @@ const Actions = { store.update({ device: res.device, deviceToken: res.deviceToken }); localStorage.setItem('mobileSimDevice', JSON.stringify({ device: res.device, deviceToken: res.deviceToken })); - await detectMediaMode(); Toast.show('Device Registered', 'success'); navigateBasedOnRole(); @@ -1341,7 +1055,6 @@ const Actions = { stopFrameRelay(); await stopLocalRecording(); teardownPeerConnection(); - resetSfuRuntime(); stopCameraPreview(); localStorage.removeItem('mobileSimDevice'); Toast.show('Signed Out', 'info'); @@ -1380,25 +1093,12 @@ const Actions = { }, requestStream: async (camId) => { - if (streamRequestInFlight) { - return; - } - const current = store.get(); - if (current.activeStreamSessionId) { - return; - } - - streamRequestInFlight = true; try { store.update({ activeCameraDeviceId: camId }); Toast.show('Requesting Stream...', 'info'); await API.streams.request(camId); // Socket will handle the rest ('stream:started') - } catch (e) { - store.update({ activeCameraDeviceId: null }); - } finally { - streamRequestInFlight = false; - } + } catch (e) { } }, openRecording: async (recordingId) => { @@ -1514,8 +1214,7 @@ const render = (state) => { // 5. Client Mode Lists if (state.device?.role === 'client' && state.screen === 'home') { - if (!hasAutoRequestedInitialStream && !state.activeCameraDeviceId && state.linkedCameras.length > 0 && !streamRequestInFlight) { - hasAutoRequestedInitialStream = true; + if (!state.activeCameraDeviceId && state.linkedCameras.length > 0) { void Actions.requestStream(state.linkedCameras[0].cameraDeviceId); } diff --git a/Backend/routes/ops.ts b/Backend/routes/ops.ts index bdbf7ed..1320e9b 100644 --- a/Backend/routes/ops.ts +++ b/Backend/routes/ops.ts @@ -17,18 +17,7 @@ router.get('/ready', async (_req, res) => { try { await db.execute('select 1'); await minioClient.bucketExists(minioBucket); - const sfu = sfuService; - const sfuSessions = sfu ? await sfu.listSessions() : []; - const sfuSessionIds = sfuSessions.map((session) => session.streamSessionId); - const sfuTransports = sfu - ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listTransports(streamSessionId)))).flat() - : []; - const sfuProducers = sfu - ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listProducers(streamSessionId)))).flat() - : []; - const sfuConsumers = sfu - ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listConsumers(streamSessionId)))).flat() - : []; + const sfuSessions = sfuService ? await sfuService.listSessions() : []; res.json({ status: 'ready', @@ -39,9 +28,6 @@ router.get('/ready', async (_req, res) => { mediaProvider: mediaProvider.name, sfuService: sfuService ? sfuService.mode : 'disabled', sfuActiveSessions: sfuSessions.filter((session) => session.state !== 'ended').length, - sfuTransports: sfuTransports.length, - sfuProducers: sfuProducers.length, - sfuConsumers: sfuConsumers.length, }, timestamp: new Date().toISOString(), }); diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 81f5971..17775b1 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -40,23 +40,6 @@ const sfuTransportRequestSchema = z.object({ role: z.enum(['camera', 'viewer']).optional(), }); -const sfuTransportConnectSchema = z.object({ - transportId: z.string().min(1), - dtlsParameters: z.record(z.string(), z.unknown()).default({}), -}); - -const sfuProduceSchema = z.object({ - transportId: z.string().min(1), - kind: z.enum(['audio', 'video']).default('video'), - rtpParameters: z.record(z.string(), z.unknown()).default({}), -}); - -const sfuConsumeSchema = z.object({ - transportId: z.string().min(1), - producerId: z.string().min(1).optional(), - rtpCapabilities: z.record(z.string(), z.unknown()).optional(), -}); - const listSchema = z.object({ status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), @@ -105,14 +88,6 @@ const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: strin where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), }); -const ensureSfuEnabled = (res: Parameters[1]) => { - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return null; - } - return sfuService; -}; - router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); @@ -471,8 +446,10 @@ router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - const sfu = ensureSfuEnabled(res); - if (!sfu) return; + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -486,58 +463,11 @@ router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) return; } - const [sfuSession, transports, producers, consumers] = await Promise.all([ - sfu.getSession(session.id), - sfu.listTransports(session.id), - sfu.listProducers(session.id), - sfu.listConsumers(session.id), - ]); - + const sfuSession = await sfuService.getSession(session.id); res.json({ streamSessionId: session.id, mediaMode, sfuSession, - transports, - producers, - consumers, - }); -}); - -router.get('/:streamSessionId/sfu/router-rtp-capabilities', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const sfu = ensureSfuEnabled(res); - if (!sfu) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; - if (!isParticipant) { - res.status(403).json({ message: 'Device cannot access SFU router capabilities for this stream' }); - return; - } - - const routerRtpCapabilities = await sfu.getRouterRtpCapabilities(session.id); - if (!routerRtpCapabilities) { - res.status(409).json({ message: 'SFU session is not initialized for this stream' }); - return; - } - - res.json({ - streamSessionId: session.id, - mediaMode, - routerRtpCapabilities, }); }); @@ -557,8 +487,10 @@ router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - const sfu = ensureSfuEnabled(res); - if (!sfu) return; + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -576,11 +508,11 @@ router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async return; } - const transport = await sfu.createPublishTransport({ + const transport = await sfuService.createPublishTransport({ streamSessionId: session.id, cameraDeviceId: deviceAuth.deviceId, }); - await sfu.setSessionState(session.id, 'live'); + await sfuService.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, @@ -605,8 +537,10 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - const sfu = ensureSfuEnabled(res); - if (!sfu) return; + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -625,11 +559,11 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn return; } - const transport = await sfu.createSubscribeTransport({ + const transport = await sfuService.createSubscribeTransport({ streamSessionId: session.id, viewerDeviceId: deviceAuth.deviceId, }); - await sfu.setSessionState(session.id, 'live'); + await sfuService.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, @@ -638,186 +572,6 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn }); }); -router.post('/:streamSessionId/sfu/publish-transport/connect', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const sfu = ensureSfuEnabled(res); - if (!sfu) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - if (session.cameraDeviceId !== deviceAuth.deviceId) { - res.status(403).json({ message: 'Only camera device can connect publish transport' }); - return; - } - - try { - const transport = await sfu.connectPublishTransport({ - streamSessionId: session.id, - transportId: parsedBody.data.transportId, - deviceId: deviceAuth.deviceId, - dtlsParameters: parsedBody.data.dtlsParameters, - }); - await sfu.setSessionState(session.id, 'live'); - res.json({ streamSessionId: session.id, mediaMode, transport }); - } catch (error) { - res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect publish transport' }); - } -}); - -router.post('/:streamSessionId/sfu/subscribe-transport/connect', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const sfu = ensureSfuEnabled(res); - if (!sfu) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; - if (!isParticipant) { - res.status(403).json({ message: 'Device cannot connect subscribe transport for this stream' }); - return; - } - - try { - const transport = await sfu.connectSubscribeTransport({ - streamSessionId: session.id, - transportId: parsedBody.data.transportId, - deviceId: deviceAuth.deviceId, - dtlsParameters: parsedBody.data.dtlsParameters, - }); - await sfu.setSessionState(session.id, 'live'); - res.json({ streamSessionId: session.id, mediaMode, transport }); - } catch (error) { - res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect subscribe transport' }); - } -}); - -router.post('/:streamSessionId/sfu/produce', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuProduceSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const sfu = ensureSfuEnabled(res); - if (!sfu) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - if (session.cameraDeviceId !== deviceAuth.deviceId) { - res.status(403).json({ message: 'Only camera device can publish media' }); - return; - } - - try { - const producer = await sfu.produce({ - streamSessionId: session.id, - transportId: parsedBody.data.transportId, - cameraDeviceId: deviceAuth.deviceId, - kind: parsedBody.data.kind, - rtpParameters: parsedBody.data.rtpParameters, - }); - await sfu.setSessionState(session.id, 'live'); - res.json({ streamSessionId: session.id, mediaMode, producer }); - } catch (error) { - res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to produce media' }); - } -}); - -router.post('/:streamSessionId/sfu/consume', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuConsumeSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const sfu = ensureSfuEnabled(res); - if (!sfu) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; - if (!isParticipant) { - res.status(403).json({ message: 'Device cannot consume media for this stream' }); - return; - } - - try { - const consumer = await sfu.consume({ - streamSessionId: session.id, - transportId: parsedBody.data.transportId, - viewerDeviceId: deviceAuth.deviceId, - producerId: parsedBody.data.producerId, - rtpCapabilities: parsedBody.data.rtpCapabilities, - }); - await sfu.setSessionState(session.id, 'live'); - res.json({ streamSessionId: session.id, mediaMode, consumer }); - } catch (error) { - res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to consume media' }); - } -}); - router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params);