I was able to control my 2m/70cm rotatable beam from SatNOGS using PstRotator software and custom protocol proxy written in python. The proxy is an ugly hack but it works for me, so hope it will be useful to someone. The PstRotator software supports a wast amount of rotors, it would be beneficial to use it in SatNOGS. Unfortunately at this moment PstRotator does not support SatNOGS version of hamlib (dont ask), so some proxy required in order to translate rotctl commands to something PstRotator will understand.
My configuration consists of RasPi running SatNOGs and Windows PC running PstRotator. The cheap Channel Master rotator controlled by PstRotator over USB-UIRT module (again, does not matter what rotor you have as long as it is supported by PstRotator).
You will need to specify the ROT_MODEL_NETROTCTL
in your SatNOGS setup, along with the port and IP of your Windows PC (the one which will be running proxy).
In your PstRotator you will need to enable Setup --> UDP control option.
Save the script below to a file and update if you have different port/IP of your Windows box (I mentioned already that this script is an ugly hack).
Works with python 2.7 (python 3 needs some work). To start, issue âpython filename.pyâ
Feel free to share or improve, this is not supported code by any means.
#!/usr/bin/env python2.7
#
# Andriy Rokhmanov - w9km@yahoo.com
# Protocol Proxy between PstRotator and Hamlib rotctl
# Based on:
# Project Horus - Rotator Abstraction Layers
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import socket
import time
import logging
from threading import Thread
class PSTRotator(object):
""" PSTRotator communication class """
# Local store of current azimuth/elevation
current_azimuth = None
current_elevation = None
azel_thread_running = False
service_thread_running = False
last_poll_time = 0
def __init__(self, hostname='localhost', port=12000, poll_rate=1, service_port=4533):
""" Start a PSTRotator connection instance """
self.hostname = hostname
self.port = port
self.poll_rate = poll_rate
self.service_port = service_port
self.azel_thread_running = True
self.service_thread_running = True
self.t_rx = Thread(target=self.azel_rx_loop)
self.t_rx.start()
self.t_poll = Thread(target=self.azel_poll_loop)
self.t_poll.start()
#self.t_service = Thread(target=self.service_loop)
#self.t_service.start()
def close(self):
self.azel_thread_running = False
self.service_thread_running = False
def set_azel(self,azimuth,elevation):
""" Send an Azimuth/Elevation move command to PSTRotator """
# Sanity check inputs.
if elevation > 90.0:
elevation = 90.0
elif elevation < 0.0:
elevation = 0.0
if azimuth > 360.0:
azimuth = azimuth % 360.0
# Generate command
pst_command = "<PST><TRACK>0</TRACK><AZIMUTH>%.1f</AZIMUTH><ELEVATION>%.1f</ELEVATION></PST>" % (azimuth,elevation)
logging.debug("Sent command: %s" % pst_command)
# Send!
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.sendto(pst_command.encode('ascii'), (self.hostname,self.port))
udp_socket.close()
return True
def poll_azel(self):
""" Poll PSTRotator for an Azimuth/Elevation Update """
az_poll_command = "<PST>AZ?</PST>"
el_poll_command = "<PST>EL?</PST>"
try:
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.sendto(az_poll_command.encode('ascii'), (self.hostname,self.port))
time.sleep(0.2)
udp_socket.sendto(el_poll_command.encode('ascii'), (self.hostname,self.port))
udp_socket.close()
except:
pass
def service_loop(self):
while self.service_thread_running:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', self.service_port))
s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
logging.debug("Rotctl Sent: %s" % data)
conn.sendall(data)
def azel_poll_loop(self):
while self.azel_thread_running:
self.poll_azel()
#logging.debug("Poll sent to PSTRotator.")
time.sleep(self.poll_rate)
def azel_rx_loop(self):
""" Listen for Azimuth and Elevation reports from PSTRotator"""
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.settimeout(1)
s.bind(('',(self.port+1)))
logging.debug("Started PST Rotator Listener Thread.")
while self.azel_thread_running:
try:
m = s.recvfrom(512)
except socket.timeout:
m = None
if m != None:
# Attempt to parse Azimuth / Elevation
#logging.debug("Received: %s" % m[0])
data = m[0].decode('ascii')
if data[:2] == 'EL':
if data[3:]:
#logging.debug("Empty EL string.")
pass
else:
self.current_elevation = float(data[3:])
elif data[:2] == 'AZ':
if data[3:]:
#logging.debug("Empty AZ string.")
pass
else:
self.current_azimuth = float(data[3:])
logging.debug("Closing UDP Listener")
s.close()
def get_azel(self):
return (self.current_azimuth, self.current_elevation)
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
rot = PSTRotator(hostname="192.168.0.208", poll_rate=10)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('192.168.0.208', 4533))
sock.listen(1)
while True:
conn, addr = sock.accept()
try:
logging.debug("Connected by %s", addr)
while True:
data = conn.recv(1024)
if not data:
break
for line in data.splitlines():
#logging.debug("Received: %s", line)
if line.startswith('\dump_state'):
logging.debug("dump_state: %s" % line)
dump_state = "10"+"\n"+"11"+"\n"+"0"+"\n"+"359"+"\n"+"0"+"\n"+"90"+"\n"
conn.sendall(dump_state)
elif line.startswith('P'):
logging.debug("Set Position: %s" % line)
pos = line.split(' ')
rot.current_azimuth = pos[1]
rot.current_elevation = pos[2]
conn.sendall("RPRT 0\n")
rot.set_azel(float(pos[1]), float(pos[2]))
#rot.set_azel(120, 20)
elif line.startswith('p'):
logging.debug("Get Position: %s" % line)
if rot.current_azimuth == None:
rot.current_azimuth = "0"
if rot.current_elevation == None:
rot.current_elevation = "0"
current_pos = rot.current_azimuth + "\n" + rot.current_elevation
logging.debug("Reporting: %s", current_pos)
conn.sendall(current_pos + "\n")
else:
logging.debug("Other.. %s" % line)
except:
pass
finally:
conn.close()