5 Sep 2023 ยท Software Engineering

    Building Scalable Applications Using Redis as a Message Broker

    19 min read
    Contents

    Redis is an open-source, in-memory datastore. You can use Redis as a messaging system by employing one of its three features: Streams, Pub/Sub, and Lists. Additionally, Redis offers Transaction and Pipeline capabilities to help build performant applications at scale. Lua scripting, server-side functions, and modules further enable you to extend Redis’s functionality.

    In this article, we will discuss the Redis data structures that can be employed to build messaging solutions, each illustrated with a sample application written in Go.

    What Is Asynchronous Messaging, and How Does It Help with Application Scalability?

    Asynchronous messaging is a communication pattern in which applications exchange messages without requiring immediate responses. This model involves the sender and receiver decoupling their interactions, which allows them to operate independently and asynchronously. In this paradigm, the sender publishes messages to a messaging system or message broker, which then delivers the messages to one or more recipients. This approach offers several advantages for application scalability.

    First, asynchronous messaging enables loose coupling between components, as they don’t need to be aware of each other’s existence. This flexibility facilitates the independent scaling of different system components, which can process messages at their own pace. Second, it helps to mitigate performance bottlenecks by offloading time-consuming tasks to background processes, thereby preventing the main application from becoming unresponsive. By distributing workloads across multiple asynchronous message handlers, applications can accommodate a higher volume of requests and achieve better scalability. Moreover, asynchronous messaging supports the decoupling of different system components, enhancing fault tolerance and resilience; failures or slowdowns in one component do not directly impact others.

    Overall, asynchronous messaging promotes scalability through parallel processing, load balancing, fault tolerance, and the independent scaling of system components.

    An Overview of Messaging Data Structures in Redis

    This section offers a high-level overview of Redis Lists, Pub/Sub, and Streams, along with some essential commands associated with each.

    Redis List

    A Redis List is a versatile data structure that enables the storage and manipulation of ordered collections of elements. Lists in Redis are implemented as linked lists, providing efficient insertion and deletion operations at both ends of the list.

    Redis furnishes a rich set of commands for list operations, including the following:

    • LPUSH: Inserts one or more elements at the beginning of a list. This command can create new lists or add elements to existing ones.
    • RPUSH: Appends one or more elements at the end of a list. Like LPUSH, it can be used for new or existing lists.
    • LPOP: Removes and returns the element at the beginning of a list, useful for processing elements in a queue-like fashion.
    • RPOP: Removes and returns the element at the end of a list, commonly used to implement stack-like behavior.
    • LLEN: Returns the length of a list, indicating the total number of elements.
    • LRANGE: Retrieves a range of elements from a list based on their indices.
    • LINDEX: Returns the element at a specific index in the list.
    • LINSERT: Inserts an element before or after a specified element in the list.

    Redis Pub/Sub

    Redis Pub/Sub (Publish/Subscribe) is a messaging pattern that facilitates the broadcasting of messages to multiple subscribers. In this model, a Redis client can act as both a publisher, sending messages to channels, and a subscriber, receiving messages from those channels. Channels are string-based identifiers that serve as communication conduits.

    When a publisher disseminates a message to a channel, Redis delivers it to all connected subscribers of that channel. Subscribers can follow multiple channels, enabling concurrent reception of messages from various sources. Although Redis Pub/Sub provides a high-performance message bus, it is ephemeral, meaning messages are not stored for offline consumption.

    The following are key commands associated with Redis Pub/Sub:

    • PUBLISH: Sends messages to specific channels.
    • SUBSCRIBE: Allows clients to follow one or more channels.
    • UNSUBSCRIBE: Enables clients to stop following specific channels.
    • PSUBSCRIBE: Permits clients to subscribe to channels using pattern matching.
    • PUNSUBSCRIBE: Unsubscribes clients from channels followed through PSUBSCRIBE.
    • PUBSUB: Provides information about the Pub/Sub system in Redis.

    Redis Streams

    Redis Streams is a robust data structure designed explicitly for managing high-volume, real-time data streams. Streams offer a way to model and process continuous data flows in a fault-tolerant and scalable manner. Each entry in a stream has a unique ID and consists of a key-value pair, allowing for the inclusion of additional metadata with the message payload.

    Here are some essential commands related to Redis Streams:

    • XADD: Appends a new entry to a stream.
    • XLEN: Returns the total number of entries in a stream.
    • XREAD: Enables consumers to read entries from one or more streams.
    • XGROUP: Manages consumer groups within a stream.
    • XREADGROUP: Similar to XREAD, but designed for consumer groups.
    • XACK: Acknowledges the successful processing of an entry by a consumer.
    • XDEL: Deletes one or more entries from a stream based on their IDs.

    By understanding and utilizing these Redis data structures and commands, developers can effectively implement scalable messaging solutions.

    Choosing the Right Messaging Option in Redis

    With a plethora of choices available, selecting the appropriate tool for your specific needs is crucial. When deliberating between Redis List, Pub/Sub, and Redis Streams, it’s important to take into account the particular requirements of your use case. For instance, if your application necessitates a load-balanced queue, Redis List is a sensible choice, as Redis Streams could introduce unwarranted complexity. On the other hand, if your project demands stream processing semanticsโ€”coupled with persistence and flexible traversalโ€”Redis Streams is the optimal choice. Last but not least, if the ephemeral nature of Redis Pub/Sub aligns well with your needs and you’re aiming for maximum performance, Redis Pub/Sub is the way to go.

    Below is a table that summarizes the advantages and disadvantages of these solutions:

    Data StructureBenefitsDrawbacks
    ListEasy to use, present in existing open-source solutions, also provides a reliable API version (BRPOLPUSH).Does not support stream processing semantics.
    Pub/SubCan handle high message volumesMessages are ephemeral i.e. consumers need to be subscribed in order to receive messages.
    Redis StreamsFirst-class stream processing capabilities, flexible stream traversal along with advanced features such as consumer groups, auto-claiming and observability.Complex data structure. Use lists or Pub/Sub if advanced functionality is not required.

    Deep-dive into Redis messaging

    Now that we have a basic overview of messaging related data structures available in Redis, let’s learn how to use these using Go. For this you will need to install a recent version of Go and have a Redis instance running locally. For example, you can use the following command to start a Redis instance in a Docker container:

    docker run --rm -it --name redis -p 6379:6379 redis

    You can use redis-cli to interact with Redis from the command line. It is automatically installed when you setup Redis for your operating system. However, if you’re using Redis from Docker, you can access redis-cli directly from the Docker container (as per instructions in the subsequent sections).

    Create a new Go module and add the required dependencies:

    go mod init go-redis-messaging-data-structures 
    go get github.com/redis/go-redis/v9

    Redis List

    Redis Lists support blocking operations like BLPOP and BRPOP, which allow applications to wait for a specific duration if the list is empty. Once an item is added to the listโ€”typically via the LPUSH method by another applicationโ€”it can be immediately fetched and processed. This decoupling enables the creation of asynchronous and event-driven architectures, often referred to as the worker queue pattern, where the list serves as a queue.

    Producers populate the queue with items, and worker applications (or consumers) dequeue and process them. By utilizing Redis Lists and their blocking operations, developers can craft efficient and scalable systems for task distribution, message queuing, and background job processing.

    Frameworks like Celery and Sidekiq support Redis as a backend for background tasks.

    Here’s an example demonstrating the use of a list as a job-processing queue (for processing user data, for example):

    package main
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"log"
    	"time"
    
    	"github.com/redis/go-redis/v9"
    )
    
    const listName = "demo-list"
    
    func main() {
    	fmt.Println("list consumer application started")
    
    	client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    	_, err := client.Ping(context.Background()).Result()
    
    	if err != nil {
    		log.Fatal("failed to connect", err)
    	}
    	for {
    		data, err := client.BRPop(context.Background(), 2*time.Second, listName).Result()
    		if err != nil {
    			if errors.Is(err, redis.Nil) {
    				continue
    			}
    			log.Println("brpop operation failed", err)
    		}
    
    		fmt.Println("received data from list -", data[1])
    	}
    }

    Before we run the program, let’s get an understanding of what’s going on.

    • We start by connecting to Redis. In this case it’s a local instance on localhost and port 6379.
    • We Ping Redis to check the connection to the Redis server. If there is an error, the program exits.
    • A loop is started to continuously consume data from the Redis List.
      • Within the loop, the BRPop function is used to block and wait for the next available element to be popped from the Redis List. It takes a context, a timeout duration (2 seconds in this case), and the name of the list. If there are no elements available within the timeout, the command returns a redis.Nil error. If there is any other error, it is logged, but the program continues to the next iteration.
      • If there is data returned by the BRPop function, it is printed to the console – BRPop returns a slice where the first element is the name of the list and the second element is the popped data.
    • Finally, the program goes back to the beginning of the loop to consume the next element from the Redis List.

    To run the above program, copy it to a file named main.go and execute the following command:

    go run main.go

    Produce data using redis-cli.

    docker exec -it redis redis-cli lpush demo-list user1
    docker exec -it redis redis-cli lpush demo-list user2

    You should see the following output:

    list consumer application started
    received data from list - user1
    received data from list - user2

    Start another instance of the consumer application in a new terminal window.

    go run main.go

    Continue to produce data using redis-cli.

    docker exec -it redis redis-cli lpush demo-list user3
    docker exec -it redis redis-cli lpush demo-list user4
    docker exec -it redis redis-cli lpush demo-list user5
    docker exec -it redis redis-cli lpush demo-list user5

    These messages will be consumed by the two consumer instances in a load-balanced way. For example, the second consumer application instance might receive user3 and user5, while the first instance might receive user4 and user5.

    You can also start additional consumer instances to see how the messages are distributed among them.

    Redis Pub/Sub

    Redis Pub/Sub offers a high-performance messaging system. When integrated with other technologies such as WebSocket, it can be employed to construct real-time applications, including chat platforms, stock tickers, and more.

    WebSocket is a communication protocol outlined in RFC 6455 that facilitates bidirectional, full-duplex communication between a client and a server over a single TCP connection. Full-duplex communication means that both the client and server can send messages to each other at any time, enabling simultaneous message exchange. This offers an improvement over other communication patterns, such as long polling, which are commonly used in HTTP-based solutions, particularly for real-time applications. By establishing a persistent connection, WebSocket eliminates the need for repeated HTTP requests, thereby allowing for efficient and instantaneous communication between the client and server.

    In a chat-like application, WebSocket and Redis Pub/Sub can be combined to offer a highly scalable architecture that accommodates numerous concurrent users. Redis Pub/Sub can address some of WebSocket’s limitations, including stateful connections, which can impede the system’s scalability by restricting it to a single application instance that users can connect to via a WebSocket session. When scaling out horizontally with multiple application instances, Redis Pub/Sub aids by broadcasting messages to all connected users, irrespective of the application instance to which they are connected.

    Example: Building a Chat Application with WebSocket and Pub/Sub

    package main
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"net/http"
    	"os"
    	"os/signal"
    	"strings"
    	"syscall"
    	"time"
    
    	"github.com/gorilla/websocket"
    	"github.com/redis/go-redis/v9"
    )
    
    var client *redis.Client
    
    var Users map[string]*websocket.Conn
    var sub *redis.PubSub
    
    const channelName = "demo-chat"
    
    func init() {
    	Users = map[string]*websocket.Conn{}
    }
    
    func main() {
    
    	client = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    	_, err := client.Ping(context.Background()).Result()
    
    	if err != nil {
    		log.Fatal("ping failed. could not connect", err)
    	}
    
    	go func() {
    		sub = client.Subscribe(context.Background(), channelName)
    		messages := sub.Channel()
    		for message := range messages {
    			from := strings.Split(message.Payload, ":")[0]
    
    			for user, peer := range Users {
    				if from != user {
    					msg := "[" + from + " says]: " + string(strings.Split(message.Payload, ":")[1])
    					peer.WriteMessage(websocket.TextMessage, []byte(msg))
    				}
    			}
    		}
    	}()
    
    	http.HandleFunc("/chat/", chat)
    	server := http.Server{Addr: ":8080", Handler: nil}
    
    	go func() {
    		fmt.Println("chat server started")
    		err := server.ListenAndServe()
    		if err != nil && err != http.ErrServerClosed {
    			log.Fatal("failed to start server", err)
    		}
    	}()
    
    	exit := make(chan os.Signal, 1)
    	signal.Notify(exit, syscall.SIGTERM, syscall.SIGINT)
    	<-exit
    
    	fmt.Println("exit signalled")
    
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    
    	for _, conn := range Users {
    		conn.Close()
    	}
    
    	sub.Unsubscribe(context.Background(), channelName)
    	sub.Close()
    
    	server.Shutdown(ctx)
    
    	fmt.Println("chat application closed")
    }
    
    var upgrader = websocket.Upgrader{}
    
    func chat(w http.ResponseWriter, r *http.Request) {
    	user := strings.TrimPrefix(r.URL.Path, "/chat/")
    
    	upgrader.CheckOrigin = func(r *http.Request) bool {
    		return true
    	}
    	c, err := upgrader.Upgrade(w, r, nil)
    	if err != nil {
    		log.Fatal("protocol upgrade error", err)
    		return
    	}
    
    	Users[user] = c
    	fmt.Println(user, "joined the chat")
    
    	for {
    		_, message, err := c.ReadMessage()
    		if err != nil {
    			_, ok := err.(*websocket.CloseError)
    			if ok {
    				fmt.Println("connection closed by:", user)
    				err := c.Close()
    				if err != nil {
    					fmt.Println("close connection error", err)
    				}
    				delete(Users, user)
    				fmt.Println("connection and user session closed")
    			}
    			break
    		}
    		client.Publish(context.Background(), channelName, user+":"+string(message)).Err()
    		if err != nil {
    			fmt.Println("publish error", err)
    		}
    	}
    }

    Before running the chat application, let’s understand its components:

    1. The Users map associates usernames with their respective WebSocket client session objects.
    2. We start by connecting to a local Redis instance on localhost and port 6379, using Ping to verify the connection. If an error occurs, the program exits.
    3. A goroutine is initiated to subscribe to the Redis Pub/Sub channel. This goroutine listens for incoming messages and broadcasts them to all connected users, excluding the sender.
    4. An HTTP endpoint is registered using http.HandleFunc(), mapped to the /chat/ route. This endpoint handles WebSocket connections for users joining the chat.
    5. An HTTP server is set up via http.Server, configured to listen on port 8080 and launched in its dedicated goroutine.
      • An exit channel is created to capture termination signals (SIGTERM and SIGINT).
      • The program remains idle until a termination signal is received on this channel.
    6. Upon receiving the termination signal, the program commences a graceful shutdown: it closes all WebSocket connections, unsubscribes from the Redis Pub/Sub channel, and shuts down the HTTP server within a timeout of 10 seconds.
    7. The chat() function manages WebSocket connections for users joining the chat. It extracts the username from the URL path and upgrades the HTTP connection to a WebSocket connection.
      • This function adds the WebSocket connection to the Users map and starts listening for incoming messages from the user.
      • When a message arrives, it is published to the Redis Pub/Sub channel with the username as a prefix. The message is then broadcast to all connected users, except for the sender.
      • If an error arises during message reading, the function checks for a close error. If found, the function terminates the connection, removes the user from the Users map, and ends the user’s session.

    To run the above program, copy it to a file named main.go and execute the following command:

    go get github.com/gorilla/websocket
    go run main.go

    You should see the following output: “chat server started”.

    To test the application, use any WebSocket client. In this article, I will be using the command line client wscat.

    Join the chat as a new user. In a new terminal:

    wscat --connect ws://localhost:8080/chat/shaun

    Once connected, you should see this output:

    Connected (press CTRL+C to quit)
    >

    Open another terminal and join as a different user:

    wscat --connect ws://localhost:8080/chat/murphy

    Now you can exchange messages between these two users using the respective terminals. Here is an example:

    Redis Streams

    Consumer groups in Redis Streams enable the distribution of the workload for message consumption across multiple consumers. When several consumers belong to a consumer group, each is allocated a subset of the stream’s messages to process. This design fosters parallel and distributed processing of the stream, enhancing throughput and facilitating scalability. Consumers within a group operate independently and handle messages concurrently, making the architecture well-suited for high-performance and fault-tolerant systems.

    To establish consumer groups, initiate the group using the XGROUP CREATE command. This command requires the stream name, group name, and an initial ID (usually “0-0”). This action sets up the consumer group and designates the starting point for message consumption. Subsequently, consumers are added to the group using the XGROUP SETID command. This command needs the consumer name and the ID of the last message the consumer processed, thereby assigning the consumer to the group and determining their designated segment of the stream.

    Messages are consumed from a consumer group through the XREADGROUP command, which fetches pending messages for a particular consumer. This command allows you to input the group name, consumer name, and either specific stream names or > to consume from all available streams. After processing a message, consumers acknowledge it using the XACK command, notifying the consumer group that the message has been successfully processed.

    Example: Using Consumer Groups in Redis Streams

    package main
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"log"
    	"time"
    
    	"github.com/redis/go-redis/v9"
    )
    
    const stream = "demo-stream"
    const consumerGroupName = "demo-group"
    
    func main() {
    
    	fmt.Println("redis streams consumer application started")
    
    	client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    
    	ctx := context.Background()
    
    	_, err := client.Ping(ctx).Result()
    	if err != nil {
    		log.Fatal("failed to connect", err)
    	}
    
    	client.XGroupCreateMkStream(ctx, stream, consumerGroupName, "$")
    
    	for {
    		result, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
    			Streams: []string{stream, ">"},
    			Group:   consumerGroupName,
    			Block:   1 * time.Second,
    		}).Result()
    
    		if err != nil {
    			if errors.Is(err, redis.Nil) {
    				continue
    			}
    			log.Fatal("xreadgroup error", err)
    		}
    
    		for _, s := range result {
    			for _, message := range s.Messages {
    				fmt.Println("got data from stream -", message.Values)
    
    				client.XAck(ctx, stream, consumerGroupName, message.ID).Err()
    				if err != nil {
    					log.Fatal("xack failed for message ", message.ID, err)
    				}
    
    				fmt.Println("acknowledged message", message.ID)
    			}
    		}
    	}
    
    }

    Before executing the program, let’s understand its functionality:

    • We start, as always, by connecting to Redis. In this case, it’s a local instance on localhost and port 6379. The Ping command checks the connection to the Redis server. If an error is detected, the program exits.
    • The XGroupCreateMkStream function creates a new consumer group called “demo-group” on the “demo-stream” Redis stream. The $ symbol is passed as the last ID argument, specifying that the consumer group should begin consuming from the earliest available message.
    • A loop commences to continuously read messages from the consumer group.
      • Within the loop, the XReadGroup function fetches messages from the “demo-stream,” utilizing the “demo-group” consumer group. The Streams parameter is set to []string{stream, ">"}, instructing it to read from “demo-stream” starting with the most recent available message. The Block parameter is set to one second, causing the command to pause for one second if no messages are available.
      • If an error occurs while executing XReadGroup, the program determines whether the error is a redis.Nil error, signifying no available messages. In this scenario, the loop advances to the next iteration.
      • If messages are returned by XReadGroup, the program iterates through the results and messages, printing the values of each retrieved message.
      • After each message is processed, the XAck function acknowledges the message. This informs the consumer group that the message has been processed successfully and can be removed from pending messages. If an error occurs during this acknowledgment, the program exits.
    • Finally, the loop restarts to read the next batch of messages from the Redis Stream.

    To run the above program, copy it to a file named main.go and execute the following command:

    go run main.go

    Produce data using redis-cli.

    docker exec -it redis redis-cli 

    Once you’re connected, you should see this prompt:

    127.0.0.1:6379>

    Use XADD to send data to the Redis Stream (demo-stream):

    xadd demo-stream * name john email jdoe@test.com
    xadd demo-stream * tom tom@test.com

    You should see the following output (the message IDs will be different in your case):

    redis streams consumer application started
    got data from stream
    name: john | email: jdoe@test.com
    acknowledged message 1686365267267-0
    got data from stream
    name: tom | email: tom@test.com
    acknowledged message 1686365268153-0
    ...

    Start another instance of the Redis Streams consumer application in a new terminal window.

    go run main.go

    Continue to produce data using redis-cli.

    xadd demo-stream * michael michael@test.com
    xadd demo-stream * sarah sarah@test.com
    xadd demo-stream * shaun shaun@test.com

    These messages will be consumed by the two consumer group instances in a load-balanced way. For example, the second consumer application instance might receive michael and sarah, while the first instance might receive shaun.

    You can also start additional instances to see how the messages are distributed among them.

    Redis Streams also offer commands for managing pending messages. The XPENDING command enables you to obtain information about pending messages in a consumer group. This information includes the number of pending messages and the IDs of both the earliest and latest pending messages. The XCLAIM command allows a consumer to assert ownership of a pending message and commence its processing.

    Conclusion

    In this article, we explored the plethora of options that Redis offers for constructing scalable, messaging-based solutions. We provided overviews of Redis List, Redis Pub/Sub, and Redis Streams, highlighting the key differences and offering guidance on which solution to use under various circumstances. Finally, we showcased practical examples illustrating how to employ these data structures in your applications.

    One thought on “Building Scalable Applications Using Redis as a Message Broker

    1. Redis, an in-memory data store, can be used as a message broker to facilitate asynchronous communication between applications. It provides a set of data structures and features suitable for implementing various messaging patterns, including publish-subscribe (Pub/Sub) and task queues. I really appreciate the insights you provided in your blog. Thank you for sharing this valuable information.

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    Avatar
    Writen by:
    Avatar
    Reviewed by:
    I picked up most of my skills during the years I worked at IBM. Was a DBA, developer, and cloud engineer for a time. After that, I went into freelancing, where I found the passion for writing. Now, I'm a full-time writer at Semaphore.