16 Nov 2023 Β· Software Engineering

    Reactive Programming in JavaScript With RxJS: A Paradigm Shift

    21 min read
    Contents

    Software development is an ever-evolving field. As developers, we face a myriad of challenges every day, solving complex logic, dealing with asynchronous events, and managing the chaos of concurrency. One of the top-tier challenges, especially in JavaScript development, has been taming the beast that is asynchronicity.

    As web applications grow more interactive and data-driven, handling multiple asynchronous operations and their complex interactions becomes critical. Traditional programming models have often proven inadequate for these challenges, necessitating new paradigms of thinking and programming. This is where Reactive Programming comes into the picture, and more specifically, its implementation in JavaScript through the Reactive Extensions for JavaScript (RxJS) library.

    There may be a little misconception between Reactive Programming and ReactJS, and this is can be traced to the similarity in their names. While both are powerful tools in the JavaScript ecosystem, they serve different purposes. RxJS deals with asynchronous data flows, while ReactJS focuses on rendering and managing the UI.

    Article Objectives

    The goal of this article is to provide readers with an understanding of reactive programming concepts, demonstrate the benefits of using reactive programming in JavaScript, and guide readers on how to effectively use RxJS to write cleaner, more maintainable, and robust code.

    Prerequisite

    Readers should have a solid understanding of JavaScript, including ES6 features. Knowledge of asynchronous programming in JavaScript (Promises, async/await) will be helpful. While prior experience with reactive programming is not required, it will be beneficial. To follow up with the articles examples, feel free to clone this git repo.

    Understanding the Event Driven Nature of Javascript

    JavaScript is fundamentally event-driven, which means it is designed to respond to user interactions or events. This is what makes JavaScript so powerful for web development, enabling interactive and dynamic behavior on web pages.

    At the heart of the event-driven nature of JavaScript are “events”. Events are actions that happen in the browser, initiated by the user or the browser itself. Examples of such events include a user clicking a button, scrolling through a web page, or a timer completing its countdown.

    When such events occur, JavaScript allows you to define event handlers which are functions called when a specific event happens. To tie an event to an event handler, we use a process called event registration. This typically involves selecting an HTML element, specifying an event to listen for, and providing an event handler that should be invoked when that event runs.

    This event-driven model allows for a highly interactive user experience. Instead of the program running from top to bottom and ending (like in procedural programming), event-driven JavaScript waits for user actions and responds accordingly.

    Understanding Reactive Programming

    While JavaScript’s event-driven nature enables us to build dynamic, interactive applications, handling complex scenarios with multiple asynchronous operations can quickly become difficult. This is where Reactive Programming comes into play.

    To fully grasp the essence of Reactive Programming, let’s draw an analogy that paints a vivid picture of this concept. Imagine a tranquil scene featuring a steady stream and an observant fisherman. This metaphor not only embodies the principles of Reactive Programming but also makes the concept relatable and easy to understand.

    Think of the stream as a never-ending source of data that flows continuously. The content of the stream could be anything, floating leaves, small and big fishes, or even occasionally a message in a bottle. This is analogous to the stream of data in a reactive programming context where data comes from various sources like user inputs, server responses, or results from computations.

    Now, consider the fisherman, standing by the stream with his fishing rod, as a JavaScript programmer. His job is to respond or “react” to whatever he catches with his fishing rod, which in our case, is an event from the data stream. Each time he catches a fish (or event), he makes a decision based on what he catches. He might keep the fish (analogous to processing the data), throw it back into the stream (ignoring the event), or even transform it (like tagging the fish with a tracker and releasing, which can be likened to modifying the data).

    The fisherman also has to deal with the unpredictable nature of the stream. Fish might bite the bait at any moment or not at all, much like events in JavaScript that happen asynchronously. For instance, it could be a user clicking a button, an API returning a response, or a timer ticking – all of these are asynchronous events, happening at their own time.

    Now, here’s where the power of reactive programming shines. Imagine the fisherman can clone himself to handle multiple rods, each catching different types of things from the stream. This is like handling multiple event sources in reactive programming. Also, instead of catching fish individually, the fisherman could use a net to catch a bunch of them at the same time – this mirrors handling multiple events simultaneously in our programming context.

    With RxJS, JavaScript developers can treat asynchronous events just like our fisherman treats the stream. They can listen to events, transform them, filter them, combine them, and much more. The unpredictable nature of asynchronous events becomes manageable, just like the unpredictable stream becomes a source of opportunity for the patient and prepared fisherman.

    Introduction to RxJS

    RxJS, short for Reactive Extensions for JavaScript, is a library that uses observable sequences to enable developers to work with asynchronous data streams. It’s essentially a set of tools that allows you to apply Reactive Programming principles in JavaScript.

    At its core, RxJS revolves around four fundamental concepts: Observables, Observers, Operators, and Subjects. These concepts form the building blocks of reactive programming and enable you to handle asynchronous operations, events, and data streams in a more streamlined and declarative way.

    Observables

    These are the cornerstone of RxJS. They represent a stream of values that can be observed over time. In other words, an Observable is like a pipeline that emits data over a period, and you can subscribe to it to receive those emissions. Observables can emit different types of values, including primitive data, objects, or even other Observables.

    Observers

    These are the entities that subscribe to Observables and receive the emitted values. They consist of three optional methods: **next****error**, and **complete**. The **next** method is called whenever a new value is emitted by the Observable, allowing the Observer to handle the value. The **error** method is invoked if an error occurs during the emission process, and the **complete** method is triggered when the Observable finishes emitting values.

    Operators

    In the realm of RxJS, operators are the essential building blocks that allow us to manipulate, transform, and control the flow of data. They are the tools that enable us to handle complex asynchronous tasks in a more manageable and readable way. With operators, we can filter out unnecessary data, transform data into a more desirable format, control the rate at which data is processed, and much more.

    Subjects

    Subjects are special types of Observables that can act as both Observables and Observers. They allow you to multicast values to multiple Observers simultaneously. Subjects have the ability to emit new values by calling their **next** method, and any subscribed Observers will receive those values. Subjects are useful in scenarios where you need to share data among multiple subscribers or create event buses.

    Leveraging these core concepts, RxJS empowers you to build reactive applications that are more efficient, and maintainable. Going further in this lesson we would see how these work.

    Setting Up Development Area

    For this tutorial, I have just created a basic HTML page and imported RxJS script with the CDN in the Header tag, and all of the custom code for this lesson would be written in ourΒ app.jsΒ file, which is linked in our body tag below:

    HTML
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>Document</title>
        <script src="https://unpkg.com/rxjs@7.8.1/dist/bundles/rxjs.umd.min.js"></script>
    </head>
    <body>
        <script src="app.js"></script>
    </body>
    </html>
    

    Inside the app.js file, we will create a helper function that will append the results from an observable to the browser. We will be using the print function to add the values to the browser.

    function print(val) {
        let element = document.createElement('p');
        element.innerText = val;
        element.style.display = 'flex';
        element.style.height = '100%';
        element.style.justifyContent = 'center';
        element.style.alignItems = 'center';
        document.body.appendChild(element);
    }
    

    Observables

    The first thing we are going to explore is the Observables with three core examples. As initially called out, observables are the bedrock of RxJS, and for me, the easiest way to put this is to think about it more like an array that gets built over time, and this array can be looped over in a dimension of time by subscribing to it. The first thing we are going to do is create an Observable from scratch and this is done by various means depending on what exactly is to be done.

    Creating an Observable from Scratch An Observable is created by calling **Rxjs.Observable.create**. This method takes an observer function where you can define what the Observable sends to the subscriber. To send values, you call **observer.next** with the value you want to send. To make the Observable start emitting values, you call **subscribe** on it.

    JS
    const observable = rxjs.Observable.create(observer => {
        observer.next('semaphore');
    });
    // Subscribe to the Observable
    observable.subscribe(val => print(val));
    
    
    

    In this example, an Observable is created using **Rxjs.Observable.create**. This Observable emits ‘semaphore’ as it’s value. The **subscribe** method is then used to subscribe to this Observable, which means that it will react whenever the Observable emits a value. In this case, it prints each value that the Observable emits.

    Creating an Observable from Events An Observable can be created from click events in the DOM by calling **Observable.fromEvent**, passing the document and the event you want to observe. Each time you click the page, the mouse event is logged along with all of its associated data.

    JS
    const { fromEvent } = rxjs;
    
    const clicks = rxjs.Observable.fromEvent(document, 'click');
    
    clicks.subscribe(click => console.log(click));

    Creating an Observable from Promises If you’re used to working with asynchronous JavaScript and promises, you can convert a promise directly into an Observable. This is extremely useful when you’re working with a JavaScript library that’s built on promises.

    JS
    const promise = new Promise((resolve, reject) => {
        setTimeout(() => {
          resolve('resolved!');
        }, 1000);
    });
    const obsvPromise = rxjs.from(promise);
    obsvPromise.subscribe(result => print(result));
    

    In this example, a Promise is created that resolves with the value ‘resolved!’ after 1 second. An Observable is then created from this Promise using **Rx.Observable.fromPromise**. This Observable will emit the value that the Promise resolves with. The Observable is then subscribed to, and the result of the Promise is printed when it resolves.

    Operators

    Operators are functions that transform, filter, or combine Observables to produce new Observables. They allow you to manipulate the emitted values in various ways, such as mapping, filtering, or reducing them. Operators provide a convenient and composable way to perform complex data transformations and apply different transformations to the data stream. We will look at the major operators we will most likely be using in our day to day development, which are: The Map, Do, Filter, First and Last, Debounce Throttle, Catch, and Try operators.

    Map The **map** operator allows you to transform the emitted value based on some underlying logic.

    JS
    const numbers = rxjs.of(20, 200, 3000);
    const logNumbers = numbers.pipe(rxjs.operators.map(Math.log));
    logNumbers.subscribe(x => print(x));
    


    n the example above, **rxjs.of** is used to create an Observable that emits the numbers 20, 200, and 3000. The **pipe**method is then used to apply the **map** operator to this Observable, creating a new Observable that emits the logarithm of each number.

    Do The **do** operator allows you to execute code without affecting the underlying Observable.

    JS
    const names = rxjs.of('Marvel', 'Ken');
    names
      .pipe(
        rxjs.operators.tap(name => print(name)),
        rxjs.operators.map(name => name.toUpperCase()),
        rxjs.operators.tap(name => print(name))
      )
      .subscribe();

    In this example, **rxjs.of** is used to create an Observable that emits the names ‘Marvel’ and ‘Ken’. The **pipe** method is then used to apply the **tap** and **map** operators to this Observable. The **tap** operator is used to perform a side effect for each emission from the Observable (in this case, printing each name), and the **map** operator is used to transform each name to uppercase. The **subscribe** method is then used to subscribe to this Observable, which causes it to start emitting values.

    Filter As the name implies, the **filter** operator allows you to filter out values based on a condition, let’s see an example.

    JS
    const filterNumbers = rxjs.of(-5, -8, -2, 9, 11, -4, -7, 14);
    numbers
      .pipe(rxjs.operators.filter(n => n >= 0))
      .subscribe(n => print(n));

    In our example above, **rxjs.of** is used to create an Observable that emits the numbers -5, -8, -2, 9, 11, -4, -7, and 14. The **pipe** method is then used to apply the **filter** operator to this Observable, creating a new Observable that only emits the numbers that are greater than or equal to 0.

    First and Last Using the same values above, the **first** and **last** operators allow you to get the first and last values from an Observable. Lets’ see how best to use it:

    JS
    const numbers = rxjs.of(-5, -8, -2, 9, 11, -4, -7, 14);
    filterNumbers.pipe(rxjs.operators.first()).subscribe(n => print(n));
    filterNumbers.pipe(rxjs.operators.last()).subscribe(n => print(n));

    In this code, **rxjs.of** is used to create an Observable that emits the numbers -5, -8, -2, 9, 11, -4, -7, and 14. The **pipe**method is then used to apply the **first** and **last** operators to this Observable, creating new Observables that only emit the first and last numbers, respectively.

    Debounce and ThrottleΒ TheΒ **debounce**Β andΒ **throttle**Β operators allow you to deal with events that emit way more values than you actually need.

    const { fromEvent } = rxjs;
    const { throttleTime } = rxjs.operators;
    
    const mouseEvents = fromEvent(document, 'mousemove');
    
    mouseEvents.pipe(
      throttleTime(1000)
    ).subscribe(e => print(e.type));
    
    function print(val) {
        console.log(val);
    }

    In the code above, **rxjs.fromEvent** is used to create an Observable that puts out a value every time a mousemove() event occurs on the DOM. The **pipe** method is then used to apply the **throttleTime** operator to this Observable, creating a new Observable that only puts out a value once every 1000 milliseconds, effectively throttling the events. The **subscribe**method is then used to subscribe to this new Observable, which means it will react whenever the Observable puts out a value.

    CatchError and Retry Error handling is a crucial aspect of any programming, and RxJS is no exception. RxJS provides several operators to help manage and handle errors that might occur during the execution of an Observable. Two of these operators are **catchError** and **retry**.

    The **catchError** operator catches errors on the source Observable by returning a new Observable or an error. This allows you to decide how to handle errors: whether you want to produce a new Observable in case of an error, or simply throw an error.

    The **retry** operator resubscribes to the source Observable when it emits an error. This means that if the source Observable fails, the **retry** operator will resubscribe to it in the hopes that it will complete successfully.

    Here’s a code example:

    JS
    // Your print function
    function print(val) {
        let element = document.createElement('p');
        element.innerText = val;
        document.body.appendChild(element);
    }
    
    // Create an Observable that will throw an error
    const source = rxjs.throwError('This is an error!');
    
    // Catch the error and return a new Observable
    const example = source.pipe(
        rxjs.operators.catchError(val => {
            print(`Caught error: ${val}`);
            return rxjs.of(`I caught: ${val}`);
        })
    );
    
    // Subscribe to the example Observable
    example.subscribe(val => print(val));
    
    // Create an Observable that emits a value then throws an error
    const source2 = rxjs.concat(rxjs.of('Hello'), rxjs.throwError('This is an error!'));
    
    // Retry the Observable twice
    const example2 = source2.pipe(
        rxjs.operators.retry(2)
    );
    
    // Subscribe to the example2 Observable
    example2.subscribe(
        val => print(val),
        err => print(`Caught error: ${err}`)
    );

    Let’s break this diagram down as to why this is important, Imagine you’re a developer working on a messaging app. In this code, we’re tackling two important scenarios:

    Error Handling: The first part of the code and diagram deals with a situation where a message can’t be sent immediately due to an error, like a network problem. The **catchError** operator helps us catch this error and handle it gracefully. This is vital for the app because it ensures that when errors occur, users are informed, and the app can offer options like retrying the message or providing helpful error messages.

    Retry Mechanism: The second part of the code and diagram represents a scenario where a message is sent but encounters an error during the process. To improve message delivery reliability, we use the **retry** operator, allowing the app to attempt sending the message multiple times. This is crucial for a messaging app to ensure that even if a message faces temporary issues, it still has a chance to be delivered, enhancing the user experience.

    Subjects

    Subjects, on the other hand, serve as a bridge between Observables and Observers, embodying both characteristics. They stand as a beacon in the data stream, capable of emitting data to an audience of multiple Observers. This makes them a powerful tool when you need to share a single data source among multiple subscribers. In other words, Subjects are the megaphones of the RxJS world, broadcasting data to all who are willing to listen.

    Creating a Subject A Subject is an Observable with the ability to emit new data to its subscribers by acting as a proxy to some other data source.

    JS
    const subject = new rxjs.Subject();
    subject.subscribe(val => print(`Observer A: ${val}`));
    subject.subscribe(val => print(`Observer B: ${val}`));
    subject.next('semaphore');

    In the code above, **rxjs.Subject** is used to create a new Subject. The **subscribe** method is then used to subscribe to this Subject, which means it will react whenever the Subject emits a value.

    Multicast

    Multicast is used to send values to multiple subscribers but not any related side effects. This is in contrast to the default behavior of Observables, which are unicast, meaning each subscriber gets its own execution of the Observable.

    In other words, when you subscribe to a regular (unicast) Observable, the Observable’s function is executed anew for each subscription. This means that each subscriber gets its own independent set of values from the Observable.

    With multicasting, however, the Observable’s function is executed once, and the values it emits are shared among all subscribers. This is like broadcasting a single event (like a radio show) to multiple listeners.

    In RxJS, multicasting is achieved using Subjects, which are both Observables and Observers. You can subscribe to a Subject as you would to an Observable, and you can also push values into a Subject so they can be observed.

    Here’s a simple example:

    JS
    const clicks = Rx.Observable.fromEvent(document, 'click');
    const source = clicks.do(_ => print('Reactive programming made easy!'));
    const subject = source.multicast(() => new Rx.Subject());
    subject.subscribe(c => print(`Clicks: ${c.clientX}`));
    subject.subscribe(c => print(`Clicks: ${c.clientX}`));
    subject.connect();
    

    In the code above, **rxjs.fromEvent** is used to create an Observable that emits a value every time a click event occurs on the document. The **multicast** operator is then used to create a Subject from this Observable. The **subscribe** method is then used to subscribe to this Subject.

    Formally this is how this code should be written, but for some reason the code above will throw an error, the RxJS library has gone through significant changes over its versions, especially from version 5 to version 6 and onwards.

    Here’s how you can fix the code:

    Use the pipe method to chain operators, then Replace do with `tap“. Replace multicast with share. Here’s a refactored version of the code:

    const { tap, share } = rxjs
    
    const clicks = fromEvent(document, 'click');
    
    const source = clicks.pipe(
      tap(_ => print('Reactive programming made easy!')),
      share()
    );
    
    source.subscribe(c => print(`Clicks: ${c.clientX}`));
    source.subscribe(c => print(`Clicks: ${c.clientX}`));
    
    
    

    Unsubscribe

    Unsubscribe refers to the action of detaching or stopping a subscriber (or observer) from receiving further values when you’re no longer interested in them,to prevent potential memory leaks.

    This is a practical example of what it looks like to unsubcribe:

    // Simple Observable implementation
    function Observable(subscribe) {
        this.subscribe = subscribe;
    }
    
    Observable.prototype = {
        subscribe: function(observer) {
            return this.subscribe(observer);
        }
    };
    
    // Create an observable that emits values at intervals
    
    function intervalObservable(ms) {
        return new Observable(function(observer) {
            let count = 0;
            const intervalId = setInterval(() => {
                observer.next(count++);
            }, ms);
    
            // Return a function to clear the interval (this is our unsubscribe)
            return {
                unsubscribe: function() {
                    clearInterval(intervalId);
                }
            };
        });
    }
    
    // print function
    function print(val) {
        let element = document.createElement('p');
        element.innerText = val;
        element.style.display = 'flex';
        element.style.height = '100%';
        element.style.justifyContent = 'center';
        element.style.alignItems = 'center';
        document.body.appendChild(element);
    }
    
    // Usage
    const observable = intervalObservable(1000); // emits a value every 1 second
    
    const subscription = observable.subscribe({
        next: function(value) {
            print(value);
            if (value >= 5) { // Just for this example, we'll unsubscribe after 5 emissions
                subscription.unsubscribe();
                print('Unsubscribed!');
            }
        }
    });
    

    In the code example, the intervalObservable function creates an Observable that emits values at regular intervals. When you subscribe to this Observable, it starts emitting values. The subscription object returned has an unsubscribe method which clears the interval, effectively stopping the emissions.

    Best Practices

    When working with RxJS, it’s not just about understanding the basics. To truly leverage the power of RxJS and write efficient, maintainable code, it’s crucial to follow certain best practices. These practices will help you avoid common mistakes, write cleaner code, and make the most of what RxJS has to offer.

    • Use Observables for Asynchronous Data: RxJS is designed to work with asynchronous data streams, making it an excellent choice for handling events, API calls, and promises.
    • Leverage Operators: RxJS comes with a wide array of operators that can be used to manipulate and handle data streams. These include **map****filter****first****last**, and moremany many more. . Understanding and effectively using these operators is key to getting the most out of RxJS.
    • Use Subjects for Multicasting: Subjects in RxJS are a special type of Observable that can multicast to many observers. This means that Subjects are capable of broadcasting data to multiple subscribers, making them useful in many scenarios.
    • Handle Errors Properly: RxJS provides operators like **catch** and **retry** to handle errors in observables. It’s important to use these operators to ensure your application can recover from errors and continue to function correctly.
    • Prevent Memory Leaks with Unsubscribe: Some Observables may not complete on their own, leading to potential memory leaks. Always remember to unsubscribe from Observables when they are no longer needed.

    Conclusion

    RxJS offers a powerful and flexible approach to handling asynchronous operations in JavaScript. The core concepts of Observables, Observers, and Subjects, along with a rich set of operators, provide developers with the tools to create complex data streams and handle them in a manageable way.

    This tutorial provides a practical and hands-on approach to understanding these concepts. We have been able to walk through the creation of Observables from various sources, demonstrate the use of operators to manipulate data streams, and highlight the importance of Subjects for multicasting values. At this point, you are ready to do a great job with this tool. Thanks for reading this far, and Have fun Coding!

    One thought on “Reactive Programming in JavaScript With RxJS: A Paradigm Shift

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    Avatar
    Writen by:
    I have successfully delivered multiple projects on topics such as user guides, blog posts, tutorials, and documentation, using my web development skills and knowledge of different technologies. I graduated with a BSc in Sport Science from the University of Portharcourt in March 2023. I enjoy learning new technologies and writing about them, as well as programming and creating videos. My goal is to combine my passion for web development and writing, and to contribute to the online community with valuable and relevant content.
    Avatar
    Reviewed by:
    I picked up most of my soft/hardware troubleshooting skills in the US Army. A decade of Java development drove me to operations, scaling infrastructure to cope with the thundering herd. Engineering coach and CTO of Teleclinic.