Top Related Projects
Asynchronous HTTP client/server framework for asyncio and Python
A next generation HTTP client for Python. 🦋
Pure Python RabbitMQ/AMQP 0-9-1 client library
paho.mqtt.python
Quick Overview
NATS.py is the official Python client library for NATS, a lightweight, high-performance cloud native messaging system. It provides a simple and efficient way to interact with NATS servers, enabling developers to build distributed systems and microservices using Python.
Pros
- Easy to use and intuitive API
- Supports both synchronous and asynchronous communication patterns
- Provides built-in support for request-reply, pub-sub, and queue groups
- Offers automatic reconnection and error handling capabilities
Cons
- Limited documentation compared to some other NATS client libraries
- May have performance overhead compared to lower-level implementations
- Requires external NATS server setup for full functionality
- Some advanced NATS features may not be fully implemented
Code Examples
- Basic publish and subscribe:
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
await nc.subscribe("greetings", cb=message_handler)
await nc.publish("greetings", b"Hello NATS!")
await asyncio.sleep(1)
asyncio.run(main())
- Request-reply pattern:
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
async def respond(msg):
await nc.publish(msg.reply, b"World")
await nc.subscribe("hello", cb=respond)
response = await nc.request("hello", b"Hello")
print(f"Response: {response.data.decode()}")
asyncio.run(main())
- Queue groups:
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
async def worker(msg):
print(f"Worker received: {msg.data.decode()}")
await nc.subscribe("tasks", "workers", cb=worker)
await nc.publish("tasks", b"New task")
await asyncio.sleep(1)
asyncio.run(main())
Getting Started
To get started with NATS.py, follow these steps:
-
Install the library using pip:
pip install nats-py
-
Import the NATS client in your Python script:
from nats.aio.client import Client as NATS
-
Connect to a NATS server and start using the client:
import asyncio async def main(): nc = NATS() await nc.connect("nats://localhost:4222") # Use nc to publish, subscribe, or make requests asyncio.run(main())
Make sure you have a NATS server running locally or specify the appropriate server address when connecting.
Competitor Comparisons
Asynchronous HTTP client/server framework for asyncio and Python
Pros of aiohttp
- More general-purpose HTTP client/server framework, suitable for a wider range of applications
- Extensive documentation and larger community support
- Built-in support for WebSockets and server-side events
Cons of aiohttp
- Steeper learning curve due to its broader feature set
- Potentially higher overhead for simple messaging tasks compared to NATS-specific solutions
Code Comparison
aiohttp example:
async with aiohttp.ClientSession() as session:
async with session.get('http://example.com') as response:
print(await response.text())
nats.py example:
async with NATS() as nc:
await nc.connect()
await nc.publish("subject", b"Hello World!")
await nc.subscribe("subject", cb=message_handler)
Summary
aiohttp is a versatile HTTP client/server framework for asyncio, while nats.py is specifically designed for NATS messaging. aiohttp offers more flexibility for general web-related tasks, but nats.py provides a simpler, more focused solution for NATS-based messaging systems. The choice between them depends on the specific requirements of your project and whether you need a general-purpose HTTP framework or a specialized NATS client.
A next generation HTTP client for Python. 🦋
Pros of httpx
- Broader scope: Supports both sync and async HTTP requests, while nats.py focuses on NATS messaging
- More comprehensive HTTP features: Includes client-side cookie handling, automatic retries, and HTTP/2 support
- Active development: More frequent updates and larger community contribution
Cons of httpx
- Larger footprint: More dependencies and a larger codebase compared to the lightweight nats.py
- Learning curve: More complex API due to its comprehensive feature set
- Not specialized: Lacks specific features for messaging systems that nats.py provides
Code Comparison
httpx example:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("https://example.com")
print(response.text)
nats.py example:
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
await nc.publish("foo", b"Hello World!")
await nc.close()
asyncio.run(main())
The code examples highlight the different focus areas of each library. httpx is designed for HTTP requests, while nats.py specializes in NATS messaging system operations.
Pure Python RabbitMQ/AMQP 0-9-1 client library
Pros of Pika
- Mature and widely adopted RabbitMQ client for Python
- Extensive documentation and community support
- Supports both synchronous and asynchronous programming models
Cons of Pika
- Specifically designed for RabbitMQ, limiting its use with other message brokers
- Can be more complex to set up and configure compared to NATS.py
- Performance may be slower for high-throughput scenarios
Code Comparison
NATS.py example:
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect()
await nc.publish("foo", b"Hello World!")
await nc.close()
asyncio.run(main())
Pika example:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
Both libraries provide straightforward ways to publish messages, but NATS.py uses an asynchronous approach, while Pika offers a blocking connection in this example. NATS.py's code is more concise and leverages Python's async/await syntax, potentially leading to better performance in high-concurrency scenarios.
paho.mqtt.python
Pros of paho.mqtt.python
- More mature and widely adopted MQTT client library
- Supports a broader range of MQTT versions (3.1, 3.1.1, and 5.0)
- Extensive documentation and community support
Cons of paho.mqtt.python
- Limited to MQTT protocol, while nats.py supports NATS protocol
- May have slightly higher overhead due to broader feature set
- Less focus on performance optimizations compared to nats.py
Code Comparison
paho.mqtt.python:
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect("broker.hivemq.com", 1883, 60)
client.publish("topic", "message")
client.loop_forever()
nats.py:
import asyncio
import nats
async def main():
nc = await nats.connect("nats://demo.nats.io:4222")
await nc.publish("topic", b"message")
await nc.close()
asyncio.run(main())
Both libraries provide simple APIs for connecting to their respective message brokers and publishing messages. paho.mqtt.python uses a callback-based approach, while nats.py leverages Python's asyncio for asynchronous operations. The nats.py example demonstrates its focus on modern async programming patterns.
Convert
designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual CopilotREADME
NATS - Python3 Client for Asyncio
An asyncio Python client for the NATS messaging system.
Supported platforms
Should be compatible with at least Python +3.8.
Installing
pip install nats-py
Getting started
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
nc = await nats.connect("nats://demo.nats.io:4222")
# You can also use the following for TLS against the demo server.
#
# nc = await nats.connect("tls://demo.nats.io:4443")
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sub = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await sub.unsubscribe(limit=2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
# Synchronous style with iterator also supported.
sub = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
try:
async for msg in sub.messages:
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await sub.unsubscribe()
except Exception as e:
pass
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await nc.publish(msg.reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sub = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 500 ms.
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
# Remove interest in subscription.
await sub.unsubscribe()
# Terminate connection to NATS.
await nc.drain()
if __name__ == '__main__':
asyncio.run(main())
JetStream
Starting v2.0.0 series, the client now has JetStream support:
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
await msg.ack()
print(msg)
# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
TLS
TLS connections can be configured with an ssl context
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")
Setting the scheme to tls
in the connect URL will make the client create a default ssl context automatically:
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
nc = NATS()
await nc.connect("tls://demo.nats.io:4443")
Note: If getting SSL certificate errors in OS X, try first installing the certifi
certificate bundle. If using Python 3.7 for example, then run:
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
...
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update complete
NKEYS and JWT User Credentials
Since v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:
pip install nats-py[nkeys]
Usage:
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")
Development
- Install nats server.
- Make sure the server is available in your PATH:
nats-server -v
. - Install dependencies:
python3 -m pipenv install --dev
. - Run tests:
python3 -m pytest
.
Updating Docs
To update the docs, first checkout the docs
branch under a local copy of the nats.py
repo
as follows:
git clone https://github.com/nats-io/nats.py
cd nats.py
git clone https://github.com/nats-io/nats.py --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments
pipenv shell
make html
# preview the changes:
make serve
If you are happy with the changes, make a PR on the docs branch:
make publish
git add docs
License
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.
Top Related Projects
Asynchronous HTTP client/server framework for asyncio and Python
A next generation HTTP client for Python. 🦋
Pure Python RabbitMQ/AMQP 0-9-1 client library
paho.mqtt.python
Convert
designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual Copilot