Convert Figma logo to code with AI

nats-io logonats.py

Python3 client for NATS

1,056
215
1,056
168

Top Related Projects

15,880

Asynchronous HTTP client/server framework for asyncio and Python

14,408

A next generation HTTP client for Python. 🦋

3,778

Pure Python RabbitMQ/AMQP 0-9-1 client library

paho.mqtt.python

Quick Overview

NATS.py is the official Python client library for NATS, a lightweight, high-performance cloud native messaging system. It provides a simple and efficient way to interact with NATS servers, enabling developers to build distributed systems and microservices using Python.

Pros

  • Easy to use and intuitive API
  • Supports both synchronous and asynchronous communication patterns
  • Provides built-in support for request-reply, pub-sub, and queue groups
  • Offers automatic reconnection and error handling capabilities

Cons

  • Limited documentation compared to some other NATS client libraries
  • May have performance overhead compared to lower-level implementations
  • Requires external NATS server setup for full functionality
  • Some advanced NATS features may not be fully implemented

Code Examples

  1. Basic publish and subscribe:
import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    async def message_handler(msg):
        print(f"Received: {msg.data.decode()}")

    await nc.subscribe("greetings", cb=message_handler)
    await nc.publish("greetings", b"Hello NATS!")
    await asyncio.sleep(1)

asyncio.run(main())
  1. Request-reply pattern:
import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    async def respond(msg):
        await nc.publish(msg.reply, b"World")

    await nc.subscribe("hello", cb=respond)

    response = await nc.request("hello", b"Hello")
    print(f"Response: {response.data.decode()}")

asyncio.run(main())
  1. Queue groups:
import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    async def worker(msg):
        print(f"Worker received: {msg.data.decode()}")

    await nc.subscribe("tasks", "workers", cb=worker)
    await nc.publish("tasks", b"New task")
    await asyncio.sleep(1)

asyncio.run(main())

Getting Started

To get started with NATS.py, follow these steps:

  1. Install the library using pip:

    pip install nats-py
    
  2. Import the NATS client in your Python script:

    from nats.aio.client import Client as NATS
    
  3. Connect to a NATS server and start using the client:

    import asyncio
    
    async def main():
        nc = NATS()
        await nc.connect("nats://localhost:4222")
        # Use nc to publish, subscribe, or make requests
    
    asyncio.run(main())
    

Make sure you have a NATS server running locally or specify the appropriate server address when connecting.

Competitor Comparisons

15,880

Asynchronous HTTP client/server framework for asyncio and Python

Pros of aiohttp

  • More general-purpose HTTP client/server framework, suitable for a wider range of applications
  • Extensive documentation and larger community support
  • Built-in support for WebSockets and server-side events

Cons of aiohttp

  • Steeper learning curve due to its broader feature set
  • Potentially higher overhead for simple messaging tasks compared to NATS-specific solutions

Code Comparison

aiohttp example:

async with aiohttp.ClientSession() as session:
    async with session.get('http://example.com') as response:
        print(await response.text())

nats.py example:

async with NATS() as nc:
    await nc.connect()
    await nc.publish("subject", b"Hello World!")
    await nc.subscribe("subject", cb=message_handler)

Summary

aiohttp is a versatile HTTP client/server framework for asyncio, while nats.py is specifically designed for NATS messaging. aiohttp offers more flexibility for general web-related tasks, but nats.py provides a simpler, more focused solution for NATS-based messaging systems. The choice between them depends on the specific requirements of your project and whether you need a general-purpose HTTP framework or a specialized NATS client.

14,408

A next generation HTTP client for Python. 🦋

Pros of httpx

  • Broader scope: Supports both sync and async HTTP requests, while nats.py focuses on NATS messaging
  • More comprehensive HTTP features: Includes client-side cookie handling, automatic retries, and HTTP/2 support
  • Active development: More frequent updates and larger community contribution

Cons of httpx

  • Larger footprint: More dependencies and a larger codebase compared to the lightweight nats.py
  • Learning curve: More complex API due to its comprehensive feature set
  • Not specialized: Lacks specific features for messaging systems that nats.py provides

Code Comparison

httpx example:

import httpx

async with httpx.AsyncClient() as client:
    response = await client.get("https://example.com")
    print(response.text)

nats.py example:

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")
    await nc.publish("foo", b"Hello World!")
    await nc.close()

asyncio.run(main())

The code examples highlight the different focus areas of each library. httpx is designed for HTTP requests, while nats.py specializes in NATS messaging system operations.

3,778

Pure Python RabbitMQ/AMQP 0-9-1 client library

Pros of Pika

  • Mature and widely adopted RabbitMQ client for Python
  • Extensive documentation and community support
  • Supports both synchronous and asynchronous programming models

Cons of Pika

  • Specifically designed for RabbitMQ, limiting its use with other message brokers
  • Can be more complex to set up and configure compared to NATS.py
  • Performance may be slower for high-throughput scenarios

Code Comparison

NATS.py example:

import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect()
    await nc.publish("foo", b"Hello World!")
    await nc.close()

asyncio.run(main())

Pika example:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()

Both libraries provide straightforward ways to publish messages, but NATS.py uses an asynchronous approach, while Pika offers a blocking connection in this example. NATS.py's code is more concise and leverages Python's async/await syntax, potentially leading to better performance in high-concurrency scenarios.

paho.mqtt.python

Pros of paho.mqtt.python

  • More mature and widely adopted MQTT client library
  • Supports a broader range of MQTT versions (3.1, 3.1.1, and 5.0)
  • Extensive documentation and community support

Cons of paho.mqtt.python

  • Limited to MQTT protocol, while nats.py supports NATS protocol
  • May have slightly higher overhead due to broader feature set
  • Less focus on performance optimizations compared to nats.py

Code Comparison

paho.mqtt.python:

import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("broker.hivemq.com", 1883, 60)
client.publish("topic", "message")
client.loop_forever()

nats.py:

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://demo.nats.io:4222")
    await nc.publish("topic", b"message")
    await nc.close()

asyncio.run(main())

Both libraries provide simple APIs for connecting to their respective message brokers and publishing messages. paho.mqtt.python uses a callback-based approach, while nats.py leverages Python's asyncio for asynchronous operations. The nats.py example demonstrates its focus on modern async programming patterns.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

NATS - Python3 Client for Asyncio

An asyncio Python client for the NATS messaging system.

docs pypi Versions License Apache 2.0

Supported platforms

Should be compatible with at least Python +3.8.

Installing

pip install nats-py

Getting started

import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError

async def main():
    # It is very likely that the demo server will see traffic from clients other than yours.
    # To avoid this, start your own locally and modify the example to use it.
    nc = await nats.connect("nats://demo.nats.io:4222")

    # You can also use the following for TLS against the demo server.
    #
    # nc = await nats.connect("tls://demo.nats.io:4443")

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sub = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await sub.unsubscribe(limit=2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    # Synchronous style with iterator also supported.
    sub = await nc.subscribe("bar")
    await nc.publish("bar", b'First')
    await nc.publish("bar", b'Second')

    try:
        async for msg in sub.messages:
            print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
            await sub.unsubscribe()
    except Exception as e:
        pass

    async def help_request(msg):
        print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
        await nc.publish(msg.reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sub = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 500 ms.
    try:
        response = await nc.request("help", b'help me', timeout=0.5)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except TimeoutError:
        print("Request timed out")

    # Remove interest in subscription.
    await sub.unsubscribe()

    # Terminate connection to NATS.
    await nc.drain()

if __name__ == '__main__':
    asyncio.run(main())

JetStream

Starting v2.0.0 series, the client now has JetStream support:

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
    nc = await nats.connect("localhost")

    # Create JetStream context.
    js = nc.jetstream()

    # Persist messages on 'foo's subject.
    await js.add_stream(name="sample-stream", subjects=["foo"])

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print(ack)

    # Create pull based consumer on 'foo'.
    psub = await js.pull_subscribe("foo", "psub")

    # Fetch and ack messagess from consumer.
    for i in range(0, 10):
        msgs = await psub.fetch(1)
        for msg in msgs:
            await msg.ack()
            print(msg)

    # Create single ephemeral push based subscriber.
    sub = await js.subscribe("foo")
    msg = await sub.next_msg()
    await msg.ack()

    # Create single push based subscriber that is durable across restarts.
    sub = await js.subscribe("foo", durable="myapp")
    msg = await sub.next_msg()
    await msg.ack()

    # Create deliver group that will be have load balanced messages.
    async def qsub_a(msg):
        print("QSUB A:", msg)
        await msg.ack()

    async def qsub_b(msg):
        print("QSUB B:", msg)
        await msg.ack()
    await js.subscribe("foo", "workers", cb=qsub_a)
    await js.subscribe("foo", "workers", cb=qsub_b)

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print("\t", ack)

    # Create ordered consumer with flow control and heartbeats
    # that auto resumes on failures.
    osub = await js.subscribe("foo", ordered_consumer=True)
    data = bytearray()

    while True:
        try:
            msg = await osub.next_msg()
            data.extend(msg.data)
        except TimeoutError:
            break
    print("All data in stream:", len(data))

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

TLS

TLS connections can be configured with an ssl context

ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")

Setting the scheme to tls in the connect URL will make the client create a default ssl context automatically:

import asyncio
import ssl
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443")

Note: If getting SSL certificate errors in OS X, try first installing the certifi certificate bundle. If using Python 3.7 for example, then run:

$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete

NKEYS and JWT User Credentials

Since v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:

pip install nats-py[nkeys]

Usage:

await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")

Development

  1. Install nats server.
  2. Make sure the server is available in your PATH: nats-server -v.
  3. Install dependencies: python3 -m pipenv install --dev.
  4. Run tests: python3 -m pytest.

Updating Docs

To update the docs, first checkout the docs branch under a local copy of the nats.py repo as follows:

git clone https://github.com/nats-io/nats.py
cd nats.py
git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments 
pipenv shell
make html
# preview the changes:
make serve

If you are happy with the changes, make a PR on the docs branch:

make publish
git add docs

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.