Kurento Application

kurento application

This project demonstrates a basic backend implementation of a Kurento video call application. It supports WebRTC streaming from a webcam or screen recording and enables HLS-to-WebRTC transcoding.

Services Used

  • Coturn – TURN server for NAT traversal

  • Kurento Media Server – Handles media processing and streaming

Nodes Used

  • Inject/Timestamp – Triggers the flow

  • Function – Adds custom JavaScript logic

  • Debug – Outputs messages to the debug console

  • PostgreSQL node – Executes SQL queries via msg.query

  • JSON – Parses JSON into JavaScript objects

  • Switch – Routes messages based on conditions

  • WebSocket In – Receives WebSocket messages from the frontend

  • WebSocket Out – Sends WebSocket messages to the frontend

  • IP – Retrieves the IP address of the host

  • kurento-client-node – Custom node from the node-red-contrib-kurento package to create and manage Kurento clients

For details, see Node-RED Documentation

GROUP: Definition of global variables and functions

An Inject node triggers several function nodes when the flow is deployed. Global variables and functions are stored using the context object. The set method saves a reference to a variable or function under a specific key, allowing other nodes in the project to access it using the get method.

FUNCTION: Check if Message Has Been Returned

The flow.get method retrieves the pipelinesMap object, which contains information about currently defined pipelines, and the releasePipeline function. Pipeline keys (names) are passed as arguments to releasePipeline to remove the corresponding pipelines in the Kurento Media Server.

let pipelinesMap = flow.get('pipelinesMap');
msg.payload = pipelinesMap;
let releasePipeline = flow.get('releasePipeline');
for (let key of Object.keys(pipelinesMap)) {
    releasePipeline(key);
}
msg.status = 'webapp flow: all pipelines and media elements have been released';
return msg;

FUNCTION: Process SDP Offer

This function defines processSdpOffer and stores it in the context using the flow.set method. It acts as a wrapper around the processOffer method, which is part of a WebRTC endpoint instance.

An SDP (Session Description Protocol) offer is a message sent during the WebRTC connection setup. It contains media configuration details (e.g., codecs, media types) that a WebRTC endpoint (a component responsible for media exchange) uses to negotiate a connection.

const processSdpOffer = function (webRtcEndpoint, sdpOffer, sessionId, callback) {
    webRtcEndpoint.processOffer(sdpOffer, function (error, sdpAnswer) {
        if (error) {
            console.log('error in processing sdp offer: ', error);
        }
        callback(null, sdpAnswer);
    });
}

flow.set('processSdpOffer', processSdpOffer);

FUNCTION: Clear ICE Candidates Queue

This function defines clearCandidatesQueue and saves it in the context using flow.set.

ICE (Interactive Connectivity Establishment) candidates are network addresses used in WebRTC to find the best path for media streams between peers. This function clears the stored ICE candidates for a given session ID from the candidatesQueue.

const clearCandidatesQueue = function (sessionId) {
    let candidatesQueue = flow.get('candidatesQueue');
    if (candidatesQueue[sessionId]) {
        delete candidatesQueue[sessionId];
    }
    flow.set('candidatesQueue', candidatesQueue);
}

flow.set('clearCandidatesQueue', clearCandidatesQueue);

msg.payload = 'clear candidates queue function defined and globally set';
return msg;

FUNCTION: Create Kurento Media Server Element

This function defines createElement and stores it in the context using flow.set. It creates a new Kurento Media Server (KMS) element within a given pipeline and returns a Promise.

Kurento supports various types of media elements: - WebRtcEndpoint: handles WebRTC connections. - PlayerEndpoint: streams media from a file or URL. - RecorderEndpoint: records media to a file. - FaceOverlayFilter, ZBarFilter, etc.: process or analyze media streams.

const createElement = function (pipeline, type, options = {}) {
    return new Promise((resolve, reject) => {
        pipeline.create(type, options, function (error, element) {
            if (error) {
                reject(error);
            } else {
                resolve(element);
            }
        });
    });
}

flow.set('createElement', createElement);
msg.payload = 'create element function defined and globally set';
return msg;

FUNCTION: Gather and Send ICE Candidates

This function defines handleIceCandidate and saves it in the context using flow.set.

In WebRTC, ICE (Interactive Connectivity Establishment) candidates help establish the best connection path between peers. This function handles queued ICE candidates for a given session by sending them to the WebRtcEndpoint, and listens for new candidates to forward them to the client.

const handleIceCandidate = function (sessionId, webRtcEndpoint) {
    console.log('handle ice function');
    let candidatesQueue = flow.get('candidatesQueue');
    if (candidatesQueue[sessionId]) {
        console.log(sessionId);
        while (candidatesQueue[sessionId].length) {
            let candidate = candidatesQueue[sessionId].shift();
            webRtcEndpoint.addIceCandidate(candidate, (error) => {
                console.error('error adding ICE candidate: ', error);
                msg.error = error;
                return msg;
            });
        }
    }

    webRtcEndpoint.on('IceCandidateFound', (event) => {
        let candidate = kurentoClient.getComplexType('IceCandidate')(event.candidate);
        console.log(sessionId);
        msg.payload = JSON.stringify({
            id: 'iceCandidate',
            candidate: candidate,
            sessionId: sessionId
        });
        node.send(msg);
    });

    webRtcEndpoint.gatherCandidates(function (error) {
        if (error) {
            console.log('error in gathering candidates: ', error);
        }
    });
}

flow.set('handleIceCandidate', handleIceCandidate);
return;

GROUP: send/receive WS messages

developer settings in GUI

Flow in the group is listening to incoming websocket messages.

GROUP: handle different WebSocket messages

developer settings in GUI

Group is used to handle incoming ws messages and direct them to correct flows based on msg.id attribute with the use of switch node.

GROUP: Kurento Stream Logic

This group handles media stream operations in the Kurento Media Server, such as creating pipelines and adding endpoints (e.g., WebRtcEndpoint, PlayerEndpoint).

A custom Kurento Client node is used to establish the connection to the Kurento Server.

developer settings in GUI

The configuration requires the Kurento WebSocket URL. Once connected, it returns an instance of the Kurento client, which provides methods for managing WebRTC connections, including creating pipelines and adding media elements.

FLOW: Create Pipeline for HLS/RTSP Video Source

HLS (HTTP Live Streaming) and RTSP (Real-Time Streaming Protocol) are streaming formats used to deliver video over networks. Kurento uses the PlayerEndpoint to read these streams and convert them into WebRTC format, allowing playback in browsers.

This flow creates a streaming pipeline from an HLS/RTSP source to a WebRTC peer.

Workflow Steps

  • Create MediaPipeline Initializes the main container for media elements.

    createElement(kurentoClient, 'MediaPipeline')
        .then(mediaPipeline => {
            pipeline = mediaPipeline;
            ...
  • Add PlayerEndpoint Loads the stream from the given URI.

    return createElement(pipeline, 'PlayerEndpoint', { uri: message.source, networkCache: 0 });
  • Add WebRtcEndpoint Enables streaming to the browser.

    return createElement(pipeline, 'WebRtcEndpoint');
  • Save Pipeline to Context Stores references for later use or cleanup.

    let pipelineObject = {
        pipeline: pipeline,
        playerEndpoint: playerEndpoint,
        webRtcEndpoint: webRtcEndpoint
    };
    pipelinesMap[sessionId] = pipelineObject;
    flow.set('pipelinesMap', pipelinesMap);
  • Handle SDP and ICE Completes WebRTC connection setup.

    processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) {
        msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: sessionId, sdpAnswer: sdpAnswer };
        node.send(msg);
    });
    handleIceCandidate(sessionId, webRtcEndpoint);
  • Connect and Start Streaming Connects endpoints and begins playback.

    playerEndpoint.connect(webRtcEndpoint, "VIDEO", function (error) { ... });
    playerEndpoint.play(function (error) { ... });

Result

The HLS/RTSP stream is converted and delivered to the frontend WebRTC client.

FLOW: Start Recording

This flow starts recording a stream in an existing Kurento pipeline using the RecorderEndpoint. The recorded media is saved locally as an MP4 file.

Workflow Steps

  • Get Pipeline for Session Retrieve the pipeline instance from the context.

    let sessionId = msg.parsedMessage.sessionId;
    let pipelinesMap = flow.get('pipelinesMap');
    const kurentoClient = msg.payload;
    const createElement = flow.get('createElement');
    
    if (pipelinesMap[sessionId]) {
        var pipelineInstance = pipelinesMap[sessionId];
    } else {
        console.error('there is no pipeline instance for this sessionId');
        return;
    }
  • Create RecorderEndpoint Define where the video will be saved and in what format.

    createElement(pipelineInstance.pipeline, 'RecorderEndpoint', {
        uri: 'file:///tmp/demo.mp4',
        mediaProfile: 'MP4_VIDEO_ONLY'
    }).then(rEndpoint => {
        pipelinesMap[sessionId].recorderEndpoint = rEndpoint;
  • Connect Player to Recorder Connect the media flow from the PlayerEndpoint to the RecorderEndpoint.

    pipelineInstance.playerEndpoint.connect(rEndpoint, function (error) {
        if (error) {
            console.log('error in connecting player and recorder endpoints: ', error);
        }
    });
  • Start Recording Begin writing the stream to the file.

    rEndpoint.record(function (error) {
        if (error) {
            console.log('error in recording: ', error);
        } else {
            console.log('recording has started');
        }
    });

Result

The current stream is recorded and saved as an MP4 file on the server.

FLOW: Create Pipeline for Webcam/Screen Sharing

This flow sets up a Kurento pipeline to stream media from a webcam or screen share via WebRTC.

Workflow Steps

  • Initialize Session and Clear ICE Queue Extract session ID and clear any queued ICE candidates.

    let sessionId = msg.parsedMessage.sessionId;
    let message = msg.parsedMessage;
    const clearCandidatesQueue = flow.get('clearCandidatesQueue')(sessionId);
  • Create MediaPipeline Creates the main pipeline container for the session.

    const kurentoClient = msg.payload;
    const createElement = flow.get('createElement');
    const handleIceCandidate = flow.get('handleIceCandidate');
    const processSdpOffer = flow.get('processSdpOffer');
    let pipelinesMap = flow.get('pipelinesMap');
    
    let pipeline;
    let webRtcEndpoint;
    
    createElement(kurentoClient, 'MediaPipeline')
        .then(mediaPipeline => {
            pipeline = mediaPipeline;
            ...
  • Create WebRtcEndpoint Adds a WebRTC endpoint to send media to the browser.

    return createElement(pipeline, 'WebRtcEndpoint');
  • Set TURN and Save Pipeline Configures TURN for NAT traversal and stores the pipeline object.

    webRtcEndpoint.setTurnUrl(global.get('turnConfig'));
    
    let pipelineObject = {
        pipeline: pipeline,
        webRtcEndpoint: webRtcEndpoint
    };
    pipelinesMap[sessionId] = pipelineObject;
    flow.set('pipelinesMap', pipelinesMap);
  • Process SDP Offer and Handle ICE Completes the WebRTC negotiation and starts ICE candidate handling.

    processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) {
        msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: sessionId, sdpAnswer: sdpAnswer };
        node.send(msg);
    });
    
    handleIceCandidate(sessionId, webRtcEndpoint);
  • Error Handling Logs and forwards errors if any occur during setup.

    .catch(error => {
        console.error(error);
        msg.error = error;
        node.send(msg);
    });

Result

A WebRTC pipeline is created for streaming webcam or screen sharing media to a browser client.

FLOW: Connect to Existing Video Source

This flow connects a new WebRTC viewer to an already running video stream (e.g., from a player or webcam). A new WebRtcEndpoint is created for the viewer and connected to the existing media pipeline.

Workflow Steps

  • Initialize Session and Clear ICE Queue Prepare session and viewer IDs, and clear any queued ICE candidates.

    let sessionId = msg.parsedMessage.sessionId;
    let viewerId = msg.parsedMessage.viewerId;
    let message = msg.parsedMessage;
    
    const clearCandidatesQueue = flow.get('clearCandidatesQueue')(sessionId);
  • Load Existing Pipeline and Create Viewer WebRtcEndpoint Get the existing pipeline and create a new WebRtcEndpoint for the viewer.

    let kurentoClient = msg.payload;
    let createElement = flow.get('createElement');
    let handleIceCandidate = flow.get('handleIceCandidate');
    let processSdpOffer = flow.get('processSdpOffer');
    let pipelinesMap = flow.get('pipelinesMap');
    
    let pipeline = pipelinesMap[sessionId];
    let webRtcEndpoint;
    
    createElement(pipeline.pipeline, 'WebRtcEndpoint')
        .then(wRtcEndpoint => {
            webRtcEndpoint = wRtcEndpoint;
  • Set TURN and Save Viewer Endpoint Configure TURN server and store the viewer’s endpoint.

    webRtcEndpoint.setTurnUrl(global.get('turnConfig'));
    
    let pipelineObject = {
        webRtcEndpoint: webRtcEndpoint
    };
    
    pipelinesMap[viewerId] = pipelineObject;
    flow.set('pipelinesMap', pipelinesMap);
  • Connect Stream to Viewer Link the source endpoint to the viewer’s WebRTC endpoint.

    pipeline.webRtcEndpoint.connect(webRtcEndpoint, function (error) {
        if (error) {
            console.error('Error connecting playerEndpoint to webRtcEndpoint: ', error);
            return;
        }
        console.log('PlayerEndpoint connected to WebRtcEndpoint');
    });
  • Process SDP Offer and Handle ICE Negotiate connection and start ICE handling for the viewer.

    processSdpOffer(webRtcEndpoint, message.sdpOffer, null, function (error, sdpAnswer) {
        msg.payload = { id: 'newPipeline', response: 'accepted', sessionId: viewerId, sdpAnswer: sdpAnswer };
        node.send(msg);
    });
    handleIceCandidate(viewerId, webRtcEndpoint);
  • Error Handling Log and send errors if the setup fails.

    .catch(error => {
        console.error(error);
        msg.error = error;
        node.send(msg);
    });

Result

A new viewer is connected to the existing media pipeline and receives the stream via WebRTC.

GROUP: Collect WebRTC Peer Metrics

This group collects statistics from connected WebRTC peers. An Inject node is configured to trigger data collection every 5 seconds.

FLOW: Send Request Message

Every 5 seconds, the Inject node sends a message with ID request-stats to all connected clients via a WebSocket connection.

FLOW: Handle Peer Metrics

Clients respond with their metrics. The function node recv client stats and concat them collects and aggregates metrics from all peers.

let pipelinesMap = flow.get('pipelinesMap');
if (Object.keys(pipelinesMap).length === 0) {
    msg.payload = 'noSessions';
    delete msg._session;
    return msg;
}
let stats = context.get('stats') || [];
stats = stats.concat(msg.payload.data);
if (Object.keys(pipelinesMap).length <= stats.length) {
    msg.payload = stats;
    delete msg._session;
    context.set('stats', []);
} else {
    context.set('stats', stats);
    return;
}
return msg;

Two additional function nodes process the aggregated data—e.g., calculating average metrics over the last minute or computing execution time—and send the results to display nodes on a custom dashboard.