Programming Modbus Clients: Python Examples
Imagine you’re building a smart home system where your Python application needs to communicate with temperature sensors, relays, and other devices. Modbus is the perfect protocol for this, and Python’s pymodbus library makes it easy to implement.
In this tutorial, we’ll explore hands-on examples for programming Modbus clients using pymodbus. We’ll cover both synchronous and asynchronous approaches, with complete runnable code snippets for reading holding registers, writing coils, and handling errors.
Introduction to pymodbus
pymodbus is a full-featured Modbus library for Python that supports:
- Cross-platform compatibility
It’s the go-to library for Python developers working with Modbus devices.
Installation
First, let’s install pymodbus. Open your terminal and run:
“`bash
pip install pymodbus
“`
Prerequisites
For this tutorial, you’ll need:
- Basic Python programming knowledge
Synchronous Modbus Client
Synchronous clients are simpler for beginners—they execute operations one after another, waiting for each to complete before moving to the next.
Example 1: Reading Holding Registers
Holding registers are the most common data type in Modbus, used for storing numerical values (e.g., temperature, pressure, counters).
“`python
save as sync_read_holding_registers.py
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
Modbus server parameters
SERVER_HOST = “127.0.0.1” # Replace with your server IP
SERVER_PORT = 502
SLAVE_ID = 1
Register parameters
START_ADDRESS = 0
REGISTERS_COUNT = 3
Create Modbus TCP client
client = ModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)
try:
# Connect to the server
client.connect()
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Read holding registers
response = client.read_holding_registers(
address=START_ADDRESS,
count=REGISTERS_COUNT,
slave=SLAVE_ID
)
# Check if response is valid
if not response.isError():
# Extract and print the register values
print(f”\nRead {REGISTERS_COUNT} holding registers from address {START_ADDRESS}:”)
for i, value in enumerate(response.registers):
print(f”Register {START_ADDRESS + i}: {value}”)
else:
print(f”Error reading registers: {response}”)
except ModbusException as e:
print(f”Modbus error: {e}”)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
client.close()
print(f”\nDisconnected from Modbus server”)
“`
Example 2: Writing Coils
Coils are single-bit outputs used for controlling devices like relays, motors, and LEDs (ON/OFF state).
“`python
save as sync_write_coils.py
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
Modbus server parameters
SERVER_HOST = “127.0.0.1”
SERVER_PORT = 502
SLAVE_ID = 1
Coil parameters
COIL_ADDRESS = 0 # Coil to write to
COIL_VALUE = True # True = ON, False = OFF
Create Modbus TCP client
client = ModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)
try:
# Connect to the server
client.connect()
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Write to a single coil
print(f”\nWriting {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}…”)
response = client.write_coil(
address=COIL_ADDRESS,
value=COIL_VALUE,
slave=SLAVE_ID
)
# Check if write was successful
if not response.isError():
print(f”✓ Successfully wrote {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}”)
# Read back to verify
read_response = client.read_coils(
address=COIL_ADDRESS,
count=1,
slave=SLAVE_ID
)
if not read_response.isError():
actual_value = bool(read_response.bits[0])
print(f”✓ Verification: Coil {COIL_ADDRESS} is now {‘ON’ if actual_value else ‘OFF’}”)
else:
print(f”✗ Error writing coil: {response}”)
except ModbusException as e:
print(f”Modbus error: {e}”)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
client.close()
print(f”\nDisconnected from Modbus server”)
“`
Example 3: Advanced Error Handling
Proper error handling is essential for robust Modbus applications. Let’s extend our example with more detailed error handling:
“`python
save as sync_error_handling.py
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import (
ModbusException,
ConnectionException,
TimeoutException
)
Modbus server parameters
SERVER_HOST = “127.0.0.1”
SERVER_PORT = 502
SLAVE_ID = 1
Register parameters
START_ADDRESS = 0
REGISTERS_COUNT = 3
Create Modbus TCP client with timeout settings
client = ModbusTcpClient(
host=SERVER_HOST,
port=SERVER_PORT,
timeout=2 # 2 seconds timeout
)
def handle_modbus_error(e, operation):
“””Helper function to handle different Modbus error types”””
if isinstance(e, ConnectionException):
return f”Connection failed for {operation}: {e}”
elif isinstance(e, TimeoutException):
return f”Timeout for {operation}: The server didn’t respond within the timeout period”
elif isinstance(e, ModbusException):
return f”Modbus error for {operation}: {e}”
else:
return f”Unexpected error for {operation}: {e}”
try:
# Connect to the server
if client.connect():
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Read holding registers with retries (simple retry mechanism)
max_retries = 3
retry_count = 0
success = False
while retry_count < max_retries and not success:
try:
response = client.read_holding_registers(
address=START_ADDRESS,
count=REGISTERS_COUNT,
slave=SLAVE_ID
)
if not response.isError():
print(f”\nRead {REGISTERS_COUNT} holding registers successfully:”)
for i, value in enumerate(response.registers):
print(f”Register {START_ADDRESS + i}: {value}”)
success = True
else:
print(f”Retry {retry_count + 1}/{max_retries}: Error reading registers: {response}”)
retry_count += 1
except Exception as e:
error_msg = handle_modbus_error(e, “reading registers”)
print(f”Retry {retry_count + 1}/{max_retries}: {error_msg}”)
retry_count += 1
if not success:
print(f”Failed to read registers after {max_retries} attempts”)
else:
print(f”Failed to connect to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
client.close()
print(f”\nDisconnected from Modbus server”)
“`
Asynchronous Modbus Client
Asynchronous clients are more efficient for applications that need to handle multiple connections or perform other tasks while waiting for Modbus responses. They use Python’s `asyncio` library.
Example 4: Async Reading Holding Registers
“`python
save as async_read_holding_registers.py
import asyncio
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
Modbus server parameters
SERVER_HOST = “127.0.0.1”
SERVER_PORT = 502
SLAVE_ID = 1
Register parameters
START_ADDRESS = 0
REGISTERS_COUNT = 3
async def async_modbus_client():
“””Async function to read holding registers”””
# Create async Modbus TCP client
client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)
try:
# Connect to the server
await client.connect()
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Read holding registers asynchronously
response = await client.read_holding_registers(
address=START_ADDRESS,
count=REGISTERS_COUNT,
slave=SLAVE_ID
)
# Check if response is valid
if not response.isError():
# Extract and print the register values
print(f”\nRead {REGISTERS_COUNT} holding registers from address {START_ADDRESS}:”)
for i, value in enumerate(response.registers):
print(f”Register {START_ADDRESS + i}: {value}”)
else:
print(f”Error reading registers: {response}”)
except ModbusException as e:
print(f”Modbus error: {e}”)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
await client.close()
print(f”\nDisconnected from Modbus server”)
Run the async function
if __name__ == “__main__”:
asyncio.run(async_modbus_client())
“`
Example 5: Async Writing Coils
“`python
save as async_write_coils.py
import asyncio
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
Modbus server parameters
SERVER_HOST = “127.0.0.1”
SERVER_PORT = 502
SLAVE_ID = 1
Coil parameters
COIL_ADDRESS = 0
COIL_VALUE = True # True = ON, False = OFF
async def async_modbus_client():
“””Async function to write coils”””
# Create async Modbus TCP client
client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)
try:
# Connect to the server
await client.connect()
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Write to a single coil
print(f”\nWriting {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}…”)
response = await client.write_coil(
address=COIL_ADDRESS,
value=COIL_VALUE,
slave=SLAVE_ID
)
# Check if write was successful
if not response.isError():
print(f”✓ Successfully wrote {‘ON’ if COIL_VALUE else ‘OFF’} to coil {COIL_ADDRESS}”)
# Read back to verify
read_response = await client.read_coils(
address=COIL_ADDRESS,
count=1,
slave=SLAVE_ID
)
if not read_response.isError():
actual_value = bool(read_response.bits[0])
print(f”✓ Verification: Coil {COIL_ADDRESS} is now {‘ON’ if actual_value else ‘OFF’}”)
else:
print(f”✗ Error writing coil: {response}”)
except ModbusException as e:
print(f”Modbus error: {e}”)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
await client.close()
print(f”\nDisconnected from Modbus server”)
Run the async function
if __name__ == “__main__”:
asyncio.run(async_modbus_client())
“`
Example 6: Async Multiple Device Communication
One of the main advantages of async is efficiently communicating with multiple devices:
“`python
save as async_multiple_devices.py
import asyncio
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
Server parameters
SERVER_HOST = “127.0.0.1”
SERVER_PORT = 502
Device configurations (slave IDs and their registers)
devices = [
{“slave_id”: 1, “reg_start”: 0, “reg_count”: 2, “name”: “Temperature Sensor”},
{“slave_id”: 2, “reg_start”: 0, “reg_count”: 2, “name”: “Pressure Sensor”},
{“slave_id”: 3, “reg_start”: 0, “reg_count”: 2, “name”: “Flow Meter”}
]
async def read_device(client, device):
“””Async function to read a single device”””
try:
response = await client.read_holding_registers(
address=device[“reg_start”],
count=device[“reg_count”],
slave=device[“slave_id”]
)
if not response.isError():
print(f”\n{device[‘name’]} (Slave {device[‘slave_id’]}):”)
for i, value in enumerate(response.registers):
print(f” Register {device[‘reg_start’] + i}: {value}”)
else:
print(f”\nError reading {device[‘name’]} (Slave {device[‘slave_id’]}): {response}”)
except Exception as e:
print(f”\nException reading {device[‘name’]} (Slave {device[‘slave_id’]}): {e}”)
async def async_modbus_client():
“””Async function to communicate with multiple devices”””
# Create async Modbus TCP client
client = AsyncModbusTcpClient(host=SERVER_HOST, port=SERVER_PORT)
try:
# Connect to the server
await client.connect()
print(f”Connected to Modbus server at {SERVER_HOST}:{SERVER_PORT}”)
# Read all devices concurrently
print(“\nReading all devices concurrently…”)
# Create tasks for each device read operation
tasks = [read_device(client, device) for device in devices]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
except Exception as e:
print(f”General error: {e}”)
finally:
# Close the connection
await client.close()
print(f”\nDisconnected from Modbus server”)
Run the async function
if __name__ == “__main__”:
asyncio.run(async_modbus_client())
“`
Synchronous vs Asynchronous: Which to Choose?
Let’s compare the two approaches to help you decide which one to use:
| Feature | Synchronous | Asynchronous |
|————-|—————-|——————|
| Complexity | Simple, easy to understand | More complex, requires async/await knowledge |
| Performance | Blocks execution during I/O | Non-blocking, efficient for multiple connections |
| Use Case | Simple applications, single device communication | Complex applications, multiple devices, real-time systems |
| Code Structure | Linear, procedural | Event-driven, concurrent |
| Scalability | Limited by blocking calls | Highly scalable, handles many connections |
| Error Handling | Traditional try-except | Same try-except with async considerations |
When to Use Synchronous
- Scripting: Short-lived scripts that don’t require concurrency
When to Use Asynchronous
- High-performance Systems: Applications requiring maximum throughput
Practical Tips for pymodbus
1. Always Use Timeouts: Set appropriate timeouts to prevent your application from hanging
2. Implement Retry Logic: Network issues happen—add simple retry mechanisms
3. Validate Responses: Always check if responses are errors before accessing data
4. Use Logging: pymodbus has built-in logging—enable it for debugging:
“`python
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
“`
5. Test with Simulators: Use Modbus simulators (like Simply Modbus) for development
6. Handle Big/Little Endian: Remember that Modbus uses big-endian by default
7. Close Connections Properly: Always close connections in a finally block
Running the Examples
To run these examples:
1. Start a Modbus Server: Use Simply Modbus TCP Server, pymodbus server, or a real Modbus device
2. Install pymodbus: `pip install pymodbus`
3. Save the Code: Save each example as a .py file
4. Configure Parameters: Update SERVER_HOST, SERVER_PORT, and SLAVE_ID as needed
5. Run the Script: `python script_name.py`
Conclusion
Python’s pymodbus library provides powerful tools for programming Modbus clients. Whether you choose the simple synchronous approach for basic applications or the more efficient asynchronous approach for complex systems, pymodbus has you covered.
Key takeaways:
- Complete Examples: All code snippets are runnable with minimal modifications
With these examples, you’re well-equipped to start building your own Modbus client applications in Python. Remember to practice in a safe test environment (see our previous article) before deploying to production systems.
Happy coding!