Outils pour utilisateurs

Outils du site


kivy_oscpy

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
kivy_oscpy [2020/11/02 10:13] – [Extrait] sergekivy_oscpy [2021/08/05 07:30] (Version actuelle) – [Envoi d'un bundle avec un Client] serge
Ligne 2: Ligne 2:
 <WRAP center round box 60% centeralign> <WRAP center round box 60% centeralign>
 **{{tagpage>kivy|Toutes les pages Kivy}}** **{{tagpage>kivy|Toutes les pages Kivy}}**
-**[[http://translate.google.com/translate?hl=&sl=auto&tl=en&u=https%3A%2F%2Fressources.labomedia.org%2Fapprendre_kivy_en_images|English Version]]**+**[[http://translate.google.com/translate?hl=&sl=auto&tl=en&u=https%3A%2F%2Fressources.labomedia.org%2Fkivy_oscpy|English Version]]**
  
 **[[les_pages_kivy_en_details|Les pages Kivy en détails]]** **[[les_pages_kivy_en_details|Les pages Kivy en détails]]**
 </WRAP> </WRAP>
 <WRAP center round box 60% centeralign> <WRAP center round box 60% centeralign>
-**A modern implementation of OSC for python 3**+**A modern implementation of OSC for python 3**\\  
 +**C'est maintenu par les développeurs Kivy, c'est bien fait.**
 </WRAP> </WRAP>
  
-Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment. C'est maintenu par les développeurs Kivy, c'est bien fait.+Ce package python fait partie du projet Kivy, mais il peut être utilisé indépendamment. 
  
 ===== Ressources ===== ===== Ressources =====
Ligne 24: Ligne 25:
   * **[[https://github.com/sergeLabo/accelerometer_service_osc|accelerometer_service_osc]]**   * **[[https://github.com/sergeLabo/accelerometer_service_osc|accelerometer_service_osc]]**
  
-====Extrait====+====Extraits====
 Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un  serveur pour recevoir et un client pour envoyer. Il n'est pas possible d'avoir 2 serveurs sur le même port sur une même machine, chaque script à un  serveur pour recevoir et un client pour envoyer.
  
-J'ai écrit quelque part que l'OSC ne supporte pas l'**UTF-8**: c'est une grosse erreur, il suffit d'encoder (en bytes) le string avec <code python> "à@éèù".encode('utf-8')</code> et de décoder avec+J'ai écrit quelque part que l'OSC ne supporte pas l'**UTF-8**. Si l'envoi et la réception se fait en python sans utiliser de bibliothèque tierce, il suffit d'encoder (en bytes) le string, ce qui est réalisé dans l'exemple ci-dessous avec<code python> "à@éèù".encode('utf-8')</code> et de décoder avec
 <code python>data.decode('utf-8')</code> <code python>data.decode('utf-8')</code>
  
 ===main.py=== ===main.py===
-<code python>+[[https://github.com/sergeLabo/accelerometer_service_osc/blob/main/main.py|main.py]]
  
 +<code python>    
 +class Accelerometer(BoxLayout):
 +
 +    def __init__(self, app):
 +        super().__init__()
 +        self.app = app
 +        Clock.schedule_interval(self.update_display, 1/50)
 +        ...
 +
 +    def update_display(self, dt):
 +        root = self.app.get_running_app()
 +        self.ids.activ_sensor.text = f"Capteur actif: {root.sensor}"
 +        
 +    def on_activity(self, act):
 +        """Appelé par on_release: root.on_activity(3) dans accelerometer.kv"""
 +        
 +        r = self.app.get_running_app()
 +        r.client.send_message(b'/activity', [act])
 +          
 +class AccelerometerApp(App):
 +
 +    def build(self):
 +        ...
 +        self.server = OSCThreadServer()
 +        self.server.listen( address=b'localhost',
 +                            port=3003,
 +                            default=True)
 +        ...  
 +        self.server.bind(b'/sensor', self.on_sensor)
 +        self.client = OSCClient(b'localhost', 3001)
 +        
 +    def on_sensor(self, sens):
 +        self.sensor = sens.decode('utf-8')
 </code> </code>
  
 ===service.py=== ===service.py===
 +[[https://github.com/sergeLabo/accelerometer_service_osc/blob/main/service.py|service.py]]
  
 <code python> <code python>
 +...
 +from oscpy.client import OSCClient
 +from oscpy.server import OSCThreadServer
 +...
 +class AccelerometerService:
 +    ...
 +    def init_osc(self):
 +        """Le serveur peut envoyer
 +        mais impossible d'avoir 2 serveurs sur le même port.
 +        """
 +        self.server = OSCThreadServer()
 +        self.server.listen('localhost', port=3001, default=True)
 +        # Les callbacks du serveur
 +        self.server.bind(b'/activity', self.on_activity)
 +        self.server.bind(b'/stop', self.on_stop)
 +        self.server.bind(b'/sensor_enable', self.on_sensor_enable)
 +        # Un simple client
 +        self.client = OSCClient(b'localhost', 3003)
 +    ...
 +    def on_activity(self, msg):
 +        print("activity", msg)
 +        self.activity = int(msg) 
 +    ...
 +    def get_acceleration(self):
 +        if self.status:
 +            a, b, c = 0,0,0
 +            ...
 +            if self.sensor_enabled != 0:
 +                # Set dans les arrays
 +                self.acc_x[self.num] = a
 +                self.acc_y[self.num] = b
 +                self.acc_z[self.num] = c
 +                self.acc_act[self.num] = self.activity
 +                acc_message = [a, b, c, self.activity, self.num]
 +                self.client.send_message(b'/acc', acc_message)
 +    ...   
 +    def run(self):
 +        while self.loop:
 +            self.get_acceleration()
 +            sleep(0.02)
 +            
 +if __name__ == '__main__':
 +    ACCELEROMETER = AccelerometerService()
 +    ACCELEROMETER.run()            
 +</code>
 +
 +=====Bundle=====
 +
 +====Envoi d'un bundle avec un Client====
 +<code python osc.py>
 +from oscpy.client import OSCClient
 +
 +class OscClient:
 +
 +    def __init__(self, **kwargs):
 +
 +        self.ip = kwargs.get('ip', None)
 +        self.port = kwargs.get('port', None)
 +
 +        self.client = OSCClient(self.ip, self.port)
 +
 +    def send_depth(self, depth):
 +        self.client.send_message(b'/depth', [depth])
 +
 +    def send_bundle(self, messages):
 +        bund = []
 +
 +        for i, msg in enumerate(messages):
 +            tag = ('/' + str(i)).encode('utf-8')
 +            print(tag, msg)
 +            bund.append([tag, msg])
 +
 +        self.client.send_bundle(bund)
 +
 +
 +if __name__ == "__main__":
 +
 +    messages = [[3.2, 3, 4], [55.6, 12, 80]]
 +    cli = OscClient(**{'ip': '127.0.0.1', 'port': 8003})
 +    cli.send_bundle(messages)
 +</code>
 +
 +<code bash>
 +b'/0' [3.2, 3, 4]
 +b'/1' [55.6, 12, 80]
 +</code>
 +====Réception d'un bundle sur un Server====
 +<code python osc_server_test.py>
 +from time import sleep
 +from oscpy.server import OSCThreadServer
 +
 +dico = {}
 +def on_tag(*args):
 +    print(args)
 +    tag = int(args[0].decode('utf-8')[1:])
 +    print(tag)
 +    dico[tag] = args[1:]
 +    print(dico)
 +
 +def default_handler(*args):
 +    print("default_handler", args)
 +
 +server = OSCThreadServer()
 +server.listen(b'localhost', port=8003, default=True)
 +server.default_handler = default_handler
 +
 +for i in range(10):
 +    tag = ('/' + str(i)).encode('utf-8')
 +    server.bind(tag, on_tag, get_address=True)
 +
 +while 1:
 +    sleep(0.1)
 +
 +</code>
  
 +<code bash>python3 osc_server_test.py 
 +(b'/0', 3.200000047683716, 3, 4)
 +0
 +{0: (3.200000047683716, 3, 4)}
 +(b'/1', 55.599998474121094, 12, 80)
 +1
 +{0: (3.200000047683716, 3, 4), 1: (55.599998474121094, 12, 80)}
 </code> </code>
  
 {{tag>kivy osc protocole_reseau python sb}} {{tag>kivy osc protocole_reseau python sb}}
kivy_oscpy.1604312010.txt.gz · Dernière modification : 2020/11/02 10:13 de serge