Outils pour utilisateurs

Outils du site


kivy_oscpy

Kivy: oscpy

A modern implementation of OSC for python 3

Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment.

C'est maintenu par les développeurs Kivy, c'est bien fait.

Ressources

Installation

sudo pip3 install oscpy

Exemple tiré de

Extraits

Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un serveur pour recevoir et un client pour envoyer.

J'ai écrit quelque part que l'OSC ne supporte pas l'UTF-8. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec

 "à@éèù".encode('utf-8')

et de décoder avec

data.decode('utf-8')

main.py

main.py

class Accelerometer(BoxLayout):
 
    def __init__(self, app):
        super().__init__()
        self.app = app
        Clock.schedule_interval(self.update_display, 1/50)
        ...
 
    def update_display(self, dt):
        root = self.app.get_running_app()
        self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}"
 
    def on_activity(self, act):
        """Appelé par on_release: root.on_activity(3) dans accelerometer.kv"""
 
        r = self.app.get_running_app()
        r.client.send_message(b'/activity', [act])
 
class AccelerometerApp(App):
 
    def build(self):
        ...
        self.server = OSCThreadServer()
        self.server.listen( address=b'localhost',
                            port=3003,
                            default=True)
        ...  
        self.server.bind(b'/sensor', self.on_sensor)
        self.client = OSCClient(b'localhost', 3001)
 
    def on_sensor(self, sens):
        self.sensor = sens.decode('utf-8')

service.py

service.py

...
from oscpy.client import OSCClient
from oscpy.server import OSCThreadServer
...
class AccelerometerService:
    ...
    def init_osc(self):
        """Le serveur peut envoyer
        mais impossible d'avoir 2 serveurs sur le même port.
        """
        self.server = OSCThreadServer()
        self.server.listen('localhost', port=3001, default=True)
        # Les callbacks du serveur
        self.server.bind(b'/activity', self.on_activity)
        self.server.bind(b'/stop', self.on_stop)
        self.server.bind(b'/sensor_enable', self.on_sensor_enable)
        # Un simple client
        self.client = OSCClient(b'localhost', 3003)
    ...
    def on_activity(self, msg):
        print("activity", msg)
        self.activity = int(msg) 
    ...
    def get_acceleration(self):
        if self.status:
            a, b, c = 0,0,0
            ...
            if self.sensor_enabled != 0:
                # Set dans les arrays
                self.acc_x[self.num] = a
                self.acc_y[self.num] = b
                self.acc_z[self.num] = c
                self.acc_act[self.num] = self.activity
                acc_message = [a, b, c, self.activity, self.num]
                self.client.send_message(b'/acc', acc_message)
    ...   
    def run(self):
        while self.loop:
            self.get_acceleration()
            sleep(0.02)
 
if __name__ == '__main__':
    ACCELEROMETER = AccelerometerService()
    ACCELEROMETER.run()            
kivy_oscpy.txt · Dernière modification: 2020/11/02 13:52 de serge