Kurento Application

This project demonstrates a basic backend implementation of a Kurento video call application. It supports WebRTC streaming from a webcam or screen recording and enables HLS-to-WebRTC transcoding.
Services Used
-
Coturn – TURN server for NAT traversal
-
Kurento Media Server – Handles media processing and streaming
Nodes Used
-
Inject/Timestamp – Triggers the flow
-
Function – Adds custom JavaScript logic
-
Debug – Outputs messages to the debug console
-
PostgreSQL node – Executes SQL queries via
msg.query
-
JSON – Parses JSON into JavaScript objects
-
Switch – Routes messages based on conditions
-
WebSocket In – Receives WebSocket messages from the frontend
-
WebSocket Out – Sends WebSocket messages to the frontend
-
IP – Retrieves the IP address of the host
-
kurento-client-node – Custom node from the
node-red-contrib-kurento
package to create and manage Kurento clients
For details, see Node-RED Documentation |
GROUP: Definition of global variables and functions
An Inject node triggers several function nodes when the flow is deployed.
Global variables and functions are stored using the context
object.
The set
method saves a reference to a variable or function under a specific key,
allowing other nodes in the project to access it using the get
method.
FUNCTION: Check if Message Has Been Returned
The flow.get
method retrieves the pipelinesMap
object, which contains information about currently defined pipelines, and the releasePipeline
function.
Pipeline keys (names) are passed as arguments to releasePipeline
to remove the corresponding pipelines in the Kurento Media Server.
let pipelinesMap = flow.get('pipelinesMap');
msg.payload = pipelinesMap;
let releasePipeline = flow.get('releasePipeline');
for (let key of Object.keys(pipelinesMap)) {
releasePipeline(key);
}
msg.status = 'webapp flow: all pipelines and media elements have been released';
return msg;
FUNCTION: Process SDP Offer
This function defines processSdpOffer
and stores it in the context using the flow.set
method.
It acts as a wrapper around the processOffer
method, which is part of a WebRTC endpoint instance.
An SDP (Session Description Protocol) offer is a message sent during the WebRTC connection setup. It contains media configuration details (e.g., codecs, media types) that a WebRTC endpoint (a component responsible for media exchange) uses to negotiate a connection.
const processSdpOffer = function (webRtcEndpoint, sdpOffer, sessionId, callback) {
webRtcEndpoint.processOffer(sdpOffer, function (error, sdpAnswer) {
if (error) {
console.log('error in processing sdp offer: ', error);
}
callback(null, sdpAnswer);
});
}
flow.set('processSdpOffer', processSdpOffer);
FUNCTION: Clear ICE Candidates Queue
This function defines clearCandidatesQueue
and saves it in the context using flow.set
.
ICE (Interactive Connectivity Establishment) candidates are network addresses used in WebRTC to find the best path for media streams between peers.
This function clears the stored ICE candidates for a given session ID from the candidatesQueue
.
const clearCandidatesQueue = function (sessionId) {
let candidatesQueue = flow.get('candidatesQueue');
if (candidatesQueue[sessionId]) {
delete candidatesQueue[sessionId];
}
flow.set('candidatesQueue', candidatesQueue);
}
flow.set('clearCandidatesQueue', clearCandidatesQueue);
msg.payload = 'clear candidates queue function defined and globally set';
return msg;
FUNCTION: Create Kurento Media Server Element
This function defines createElement
and stores it in the context using flow.set
.
It creates a new Kurento Media Server (KMS) element within a given pipeline and returns a Promise.
Kurento supports various types of media elements:
- WebRtcEndpoint
: handles WebRTC connections.
- PlayerEndpoint
: streams media from a file or URL.
- RecorderEndpoint
: records media to a file.
- FaceOverlayFilter
, ZBarFilter
, etc.: process or analyze media streams.
const createElement = function (pipeline, type, options = {}) {
return new Promise((resolve, reject) => {
pipeline.create(type, options, function (error, element) {
if (error) {
reject(error);
} else {
resolve(element);
}
});
});
}
flow.set('createElement', createElement);
msg.payload = 'create element function defined and globally set';
return msg;
FUNCTION: Gather and Send ICE Candidates
This function defines handleIceCandidate
and saves it in the context using flow.set
.
In WebRTC, ICE (Interactive Connectivity Establishment) candidates help establish the best connection path between peers.
This function handles queued ICE candidates for a given session by sending them to the WebRtcEndpoint
, and listens for new candidates to forward them to the client.
const handleIceCandidate = function (sessionId, webRtcEndpoint) {
console.log('handle ice function');
let candidatesQueue = flow.get('candidatesQueue');
if (candidatesQueue[sessionId]) {
console.log(sessionId);
while (candidatesQueue[sessionId].length) {
let candidate = candidatesQueue[sessionId].shift();
webRtcEndpoint.addIceCandidate(candidate, (error) => {
console.error('error adding ICE candidate: ', error);
msg.error = error;
return msg;
});
}
}
webRtcEndpoint.on('IceCandidateFound', (event) => {
let candidate = kurentoClient.getComplexType('IceCandidate')(event.candidate);
console.log(sessionId);
msg.payload = JSON.stringify({
id: 'iceCandidate',
candidate: candidate,
sessionId: sessionId
});
node.send(msg);
});
webRtcEndpoint.gatherCandidates(function (error) {
if (error) {
console.log('error in gathering candidates: ', error);
}
});
}
flow.set('handleIceCandidate', handleIceCandidate);
return;
GROUP: handle different WebSocket messages

Group is used to handle incoming ws messages and direct them to correct flows based on msg.id attribute with the use of switch node.
GROUP: Kurento Stream Logic
This group handles media stream operations in the Kurento Media Server, such as creating pipelines and adding endpoints (e.g., WebRtcEndpoint
, PlayerEndpoint
).
A custom Kurento Client node is used to establish the connection to the Kurento Server.

The configuration requires the Kurento WebSocket URL. Once connected, it returns an instance of the Kurento client, which provides methods for managing WebRTC connections, including creating pipelines and adding media elements.
FLOW: Create Pipeline for HLS/RTSP Video Source
HLS (HTTP Live Streaming) and RTSP (Real-Time Streaming Protocol) are streaming formats used to deliver video over networks.
Kurento uses the PlayerEndpoint
to read these streams and convert them into WebRTC format, allowing playback in browsers.
This flow creates a streaming pipeline from an HLS/RTSP source to a WebRTC peer.
Workflow Steps
-
Create MediaPipeline Initializes the main container for media elements.
createElement(kurentoClient, 'MediaPipeline') .then(mediaPipeline => { pipeline = mediaPipeline; ...
-
Add PlayerEndpoint Loads the stream from the given URI.
return createElement(pipeline, 'PlayerEndpoint', { uri: message.source, networkCache: 0 });
-
Add WebRtcEndpoint Enables streaming to the browser.
return createElement(pipeline, 'WebRtcEndpoint');
-
Save Pipeline to Context Stores references for later use or cleanup.
let pipelineObject = { pipeline: pipeline, playerEndpoint: playerEndpoint, webRtcEndpoint: webRtcEndpoint }; pipelinesMap[sessionId] = pipelineObject; flow.set('pipelinesMap', pipelinesMap);
-
Handle SDP and ICE Completes WebRTC connection setup.
processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) { msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: sessionId, sdpAnswer: sdpAnswer }; node.send(msg); }); handleIceCandidate(sessionId, webRtcEndpoint);
-
Connect and Start Streaming Connects endpoints and begins playback.
playerEndpoint.connect(webRtcEndpoint, "VIDEO", function (error) { ... }); playerEndpoint.play(function (error) { ... });
FLOW: Start Recording
This flow starts recording a stream in an existing Kurento pipeline using the RecorderEndpoint
.
The recorded media is saved locally as an MP4 file.
Workflow Steps
-
Get Pipeline for Session Retrieve the pipeline instance from the context.
let sessionId = msg.parsedMessage.sessionId; let pipelinesMap = flow.get('pipelinesMap'); const kurentoClient = msg.payload; const createElement = flow.get('createElement'); if (pipelinesMap[sessionId]) { var pipelineInstance = pipelinesMap[sessionId]; } else { console.error('there is no pipeline instance for this sessionId'); return; }
-
Create RecorderEndpoint Define where the video will be saved and in what format.
createElement(pipelineInstance.pipeline, 'RecorderEndpoint', { uri: 'file:///tmp/demo.mp4', mediaProfile: 'MP4_VIDEO_ONLY' }).then(rEndpoint => { pipelinesMap[sessionId].recorderEndpoint = rEndpoint;
-
Connect Player to Recorder Connect the media flow from the
PlayerEndpoint
to theRecorderEndpoint
.pipelineInstance.playerEndpoint.connect(rEndpoint, function (error) { if (error) { console.log('error in connecting player and recorder endpoints: ', error); } });
-
Start Recording Begin writing the stream to the file.
rEndpoint.record(function (error) { if (error) { console.log('error in recording: ', error); } else { console.log('recording has started'); } });
FLOW: Create Pipeline for Webcam/Screen Sharing
This flow sets up a Kurento pipeline to stream media from a webcam or screen share via WebRTC.
Workflow Steps
-
Initialize Session and Clear ICE Queue Extract session ID and clear any queued ICE candidates.
let sessionId = msg.parsedMessage.sessionId; let message = msg.parsedMessage; const clearCandidatesQueue = flow.get('clearCandidatesQueue')(sessionId);
-
Create MediaPipeline Creates the main pipeline container for the session.
const kurentoClient = msg.payload; const createElement = flow.get('createElement'); const handleIceCandidate = flow.get('handleIceCandidate'); const processSdpOffer = flow.get('processSdpOffer'); let pipelinesMap = flow.get('pipelinesMap'); let pipeline; let webRtcEndpoint; createElement(kurentoClient, 'MediaPipeline') .then(mediaPipeline => { pipeline = mediaPipeline; ...
-
Create WebRtcEndpoint Adds a WebRTC endpoint to send media to the browser.
return createElement(pipeline, 'WebRtcEndpoint');
-
Set TURN and Save Pipeline Configures TURN for NAT traversal and stores the pipeline object.
webRtcEndpoint.setTurnUrl(global.get('turnConfig')); let pipelineObject = { pipeline: pipeline, webRtcEndpoint: webRtcEndpoint }; pipelinesMap[sessionId] = pipelineObject; flow.set('pipelinesMap', pipelinesMap);
-
Process SDP Offer and Handle ICE Completes the WebRTC negotiation and starts ICE candidate handling.
processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) { msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: sessionId, sdpAnswer: sdpAnswer }; node.send(msg); }); handleIceCandidate(sessionId, webRtcEndpoint);
-
Error Handling Logs and forwards errors if any occur during setup.
.catch(error => { console.error(error); msg.error = error; node.send(msg); });
FLOW: Connect to Existing Video Source
This flow connects a new WebRTC viewer to an already running video stream (e.g., from a player or webcam).
A new WebRtcEndpoint
is created for the viewer and connected to the existing media pipeline.
Workflow Steps
-
Initialize Session and Clear ICE Queue Prepare session and viewer IDs, and clear any queued ICE candidates.
let sessionId = msg.parsedMessage.sessionId; let viewerId = msg.parsedMessage.viewerId; let message = msg.parsedMessage; const clearCandidatesQueue = flow.get('clearCandidatesQueue')(sessionId);
-
Load Existing Pipeline and Create Viewer WebRtcEndpoint Get the existing pipeline and create a new
WebRtcEndpoint
for the viewer.let kurentoClient = msg.payload; let createElement = flow.get('createElement'); let handleIceCandidate = flow.get('handleIceCandidate'); let processSdpOffer = flow.get('processSdpOffer'); let pipelinesMap = flow.get('pipelinesMap'); let pipeline = pipelinesMap[sessionId]; let webRtcEndpoint; createElement(pipeline.pipeline, 'WebRtcEndpoint') .then(wRtcEndpoint => { webRtcEndpoint = wRtcEndpoint;
-
Set TURN and Save Viewer Endpoint Configure TURN server and store the viewer’s endpoint.
webRtcEndpoint.setTurnUrl(global.get('turnConfig')); let pipelineObject = { webRtcEndpoint: webRtcEndpoint }; pipelinesMap[viewerId] = pipelineObject; flow.set('pipelinesMap', pipelinesMap);
-
Connect Stream to Viewer Link the source endpoint to the viewer’s WebRTC endpoint.
pipeline.webRtcEndpoint.connect(webRtcEndpoint, function (error) { if (error) { console.error('Error connecting playerEndpoint to webRtcEndpoint: ', error); return; } console.log('PlayerEndpoint connected to WebRtcEndpoint'); });
-
Process SDP Offer and Handle ICE Negotiate connection and start ICE handling for the viewer.
processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) { msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: viewerId, sdpAnswer: sdpAnswer }; node.send(msg); }); handleIceCandidate(viewerId, webRtcEndpoint);
-
Error Handling Log and send errors if the setup fails.
.catch(error => { console.error(error); msg.error = error; node.send(msg); });
GROUP: Collect WebRTC Peer Metrics
This group collects statistics from connected WebRTC peers. An Inject node is configured to trigger data collection every 5 seconds.
FLOW: Send Request Message
Every 5 seconds, the Inject node sends a message with ID request-stats
to all connected clients via a WebSocket connection.
FLOW: Handle Peer Metrics
Clients respond with their metrics. The function node recv client stats and concat them
collects and aggregates metrics from all peers.
let pipelinesMap = flow.get('pipelinesMap');
if (Object.keys(pipelinesMap).length === 0) {
msg.payload = 'noSessions';
delete msg._session;
return msg;
}
let stats = context.get('stats') || [];
stats = stats.concat(msg.payload.data);
if (Object.keys(pipelinesMap).length <= stats.length) {
msg.payload = stats;
delete msg._session;
context.set('stats', []);
} else {
context.set('stats', stats);
return;
}
return msg;
Two additional function nodes process the aggregated data—e.g., calculating average metrics over the last minute or computing execution time—and send the results to display nodes on a custom dashboard.