FIGYELEM! Ez a dokumentum kizárólag az ELTE IK hallgatók számára oktatási célra készült! Félkész munka, dolgozunk rajta! Terjeszteni, felhasználni máshol a szerzők engedélye nélkül tilos!

Szálak és folyamatok


A programunkat akár részre is oszthatjuk úgy is, hogy bizonyos részeit akár egy időben is futtathatjuk. Erre a Ruby 2 alapvető módszert tartalmaz. Az együttműködő taszkokat szétválaszthatjuk akár a programon belül is, mégpedig szálak segítségével, vagy feloszthatjuk őket különböző programokra a folyamatok segítségével. Nézzük meg mindkét megoldást.

Szálak használata (Multithreading)

Gyakran két dolog egyidejű végrehajtásának legegyszerűbb módja a Ruby szálak használata. Ezek teljesen beépítettek, a Ruby interpreterben implementálja őket. Épp ezért a Ruby szálak teljesen hordozhatóak - nem függenek az operációs rendszertől -, ellenben a natív szálak használata nem jár semmilyen konkrét előnnyel. Tapasztalhatunk olyat is, hogy egy alacsony prioritású szál lehetőséget sem kap a futásra. Ha pedig a szálainkat holtpontba kergetjük, akár az egész folyamat lefagyhat. Ha az egyik szám egy nagy végrehajtási idejű hívást kezdeményez az operációs rendszer felé, az összes szál megállhat, amíg az interpreter vissza nem kapja az irányítást. Azért ne ijesszenek meg a lehetséges problémák - a Ruby szálak könnyűsújú és hatékony módot adnak a programunk párhuzamosítására.

Ruby szálak létrehozása

Egy új szál létrehozása elég egyértelmű. Lássunk egy kódtöredéket, amely letölt néhány weboldalt, mégpedig párhuzamosan. Minden kiadott kérésnél a kód egy különálló szálat nyit a HTTP tranzakció számára.

require 'net/http'
pages = %w( www.rubycentral.com
      www.awl.com
      www.pragmaticprogrammer.com
     )
threads = []
for page in pages
  threads << Thread.new(page) { |myPage|
    h = Net::HTTP.new(myPage, 80)
    puts "Fetching: #{myPage}"
    resp, data = h.get('/', nil )
    puts "Got #{myPage}:  #{resp.message}"
  }
end
threads.each { |aThread|  aThread.join }

Eredménye:

Fetching: www.rubycentral.com
Fetching: www.awl.com
Fetching: www.pragmaticprogrammer.com
Got www.rubycentral.com:  OK
Got www.pragmaticprogrammer.com:  OK
Got www.awl.com:  OK

Vegyük szemügyre kicsit részletesebben ezt a kódot.

Új szálakat a Thread.new metódussal hozhatunk létre. Egy blokkot csatolunk hozzá, amely az új szálban futtatandó kódot tartalmazza. Esetünkben a blokk a net/http könyvtárat használva letölti a főoldalt a megadott honlapokról. Tisztán olvasható, hogy ezek a letöltések párhuzamosan történnek.

Egy szál létrehozásakor a letöltendő HTML lapot adjuk meg paraméterként. Ez a paraméter blokknak myPage néven továbbítódik. Miért tesszük ezt, ahelyett, hogy a page változó értékét használnánk a blokkon belül?

Egy szál megosztozik minden (a szál indulásakor létező) globális-, példány- és lokális változón. Bárki, akinek van kisöccse, igazat adhat abban, hogy az osztozkodás nem mindig jó dolog. Ebben az esetben mindhárom szál osztozna a page változón. Az első szál elindul, és a page változó értéke http://www.rubycentral.com lesz. Eközben, a szálakat létrehozó ciklus még mindig fut. A második lépésben a page változó értéke http://www.awl.com-ra állítódik. Ha az első szál még nem fejezte be a page változó használatát, akkor hirtelen az új értékkel folyatatja a munkát. Ezeket a hibákat nem könnyű felderíteni.

Habár, egy szál blokkjában lérejött lokális változók tényleg lokálisak az adott szálra vonatkozóan - minden szál egy saját másolatot kap az adott változóból. Esetünkben a myPage változó a szál létrejöttekor kap értéket, és minden szál egy saját példányt kap ebből a változóból.

Műveletek szálakkal

Még egy meglepetést tartogat számunkra a program utolsó sora. Miért hívunk join-t (csatlakozini) minden létrehozott szálra?

Amikor egy Ruby program véget ér, minden futó szál bezáródik, mégpedig állapotától függetlenül! Ám megvárhatjuk, amíg egy adott szál véget ér, mégpedig a szál Thread#join metódusának meghívásával. A hívó szál blokkolódik, amíg a megadott szál be nem fejeződik. A join minden kérelmet tartalmazó szálra való hívásával biztosíthatjuk, hogy mindhárom kérelem teljesítésre kerül a program terminálása előtt.

A join-nal kapcsolatban megemlítjük, hogy a szálak manipulálására több hasznos metódus is létezik. Először is, az aktuális szál mindig elérhető a Thread.current metódus használatával. A Thread.list metódussal az összes futtatható, vagy megállítótt szálat tartalmazó listát kaphatunk. Egy adott szál állapotának megállításában pedig a Thread#status, vagy a Thread#alive? lehet segítségünkre.

Továbbá, a Thread#priority= használatával a szál prioritását is megadhatjuk. A magasabb prioritású szálak az alacsonyabb prioritásúak előtt fognak befejeződni. A szálak ütemezéséről, megállításáról és újraindításáról nemsokára beszélünk.

Szál típusú változók

Ahogy az előző részben láttuk, egy szál a létrejöttekor látható minden változóhoz hozzáférhet. A szál blokkjában lokális változók azonban nem osztottak a szálak között.

De mi van akkor, ha egy olyan változóra van szükségünk, amely szálanként különböző, mégis elérhető a többi szál számára is (még a főszál számára is). A Tread osztály lehetővé teszi olyan szál-lokális változók létrehozását, amelyeket név szerint hozhatunk létre, és érhetünk el. A szál objektumot egyszerűen egy hasítóként kezeljük, azaz az []= operátorral írhatjuk az elemeket, az [] operátorral pedig kiolvashatjuk azok értékeit. Ebben a példában minden szál a count változó aktuális értékét tárolja egy szál-lokális, mycount kulccsal jelzett változóban. (Van egy versenyfeltétel ebben a kódban, de még nem beszéltünk a szinkronizációról, szóval ezt most csendben figyelmen kívül hagyjuk.)

count = 0
arr = []
10.times do |i|
arr[i] = Thread.new {
  sleep(rand(0)/10.0)
  Thread.current["mycount"] = count
  count += 1
}
end
arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

Eredménye:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

A fő szál megvárja, míg az al-szálak befejezik tevékenységüket, majd kiírja a count változó értékét. Hogy még érdekesebbé tegyük a dolgot, minden szál egy véletlen időmennyiségig vár, mielőtt kiolvassa a count értékét.

Szálak és kivételek

Mi történik, ha egy szál kezeletlen kivételt vált ki? Nos, ez a http://abort_on_exception flag értékétől függ. Ha az abort_on_exception értéke hamis(false) - ez az alapbeállítás -, akkor a kezeletlen kivétel egyszerűen kilövi az aktuális szálat, a többiek tovább futnak. A következő példában a 3-as számú szál felrobban, és nem generál további kimenetet. A többi szálból azonban tovább is nyomon követhetjük.

  threads = []
    6.times { |i|
      threads << Thread.new(i) {
        raise "Boom!" if i == 3
        puts i
      }
    }
  threads.each {|t| t.join }

Eredménye:

  01
  2
  45prog.rb:4: Boom! (RuntimeError)
    from prog.rb:8:in `join'
    from prog.rb:8
    from prog.rb:8:in `each'
    from prog.rb:8

Az abort_on_exception igazra (true) állításával egy kezeletlen kivétel minden futó szálat leállít. Azaz a 3-as szál leállásával nem generálódik több kimenet.

  Thread.abort_on_exception = true
    threads = []
      6.times { |i|
      threads << Thread.new(i) {
        raise "Boom!" if i == 3
        puts i
      }
    }
  threads.each {|t| t.join }

Eredménye:

  01
  2
  prog.rb:5: Boom! (RuntimeError)
    from prog.rb:7:in `initialize'
    from prog.rb:7:in `new'
    from prog.rb:7
    from prog.rb:3:in `times'
    from prog.rb:3

A szálak ütezemezésének irányítása

Egy jóltervezett alkalmazásban normális körülmények között elég hagyni, hogy a szálak végezzék saját dolgukat. Időzítések építése egy többszálú alkalmazásba általában rossz döntés.

Habár, vannak olyan helyzetek, amikor szükségünk lehet a szálak irányítására. Lehet, hogy a zenegépünknek van egy szála, amely a fényeket irányítja. Mondjuk ideiglenesen le szeretnénk állítani, amikor a zene megáll. Vagy egy klasszikus termelő-fogyasztó rendszerben két szálunk van, ahol a fogyasztónak meg kell állnia, ha a termelő késésben van.

A Thread osztály számos, a szálak ütemezését irányító metódust tartalmaz. A Thread.stop hívása megállítja az aktuális szálat, míg a Thread#run egy adott szálat indít el. A Thread.pass az aktuális szálat kiveszi az ütemezésből, lehetőséget adva a többieknek a zavartalan futásra, valamint a Thread#join és a Thread#value felfüggeszti a hívó szálat, amíg egy megadott szál be nem fejeződik.

Ezeket a funkciókat a következő, teljesen értelmetlen programon mutatjuk be.

t = Thread.new { sleep .1; Thread.pass; Thread.stop; }
t.status
»   "sleep"
t.run
t.status
»   "run"
t.run
t.status
»  false

Ezen kezdetleges eszközök használata bármiféle szinkronizáció elérésére legjobb esetben is mindent vagy semmit játék. A versenyhelyzetek mindig a legváratlanabb pillanatban támadnak. Osztott adatok használata esetén pedig a versenyhelyzetek garantálják a hosszú és gyötrelmes hibakeresést. Szerencsére a szálak rendelkeznek még egy szolgáltatással: a kritikus szakasz ötletével. Ezek használatával számos biztonságos szinkronizációs rendszert építhetünk.

Kölcsönös kizárás

Egy adott szál kivételével minden más szál blokkolására a legalacsonyabb szintű megoldás egy globális kritikus feltétel használatával működik. Amikor a feltétel true-ra értéket kap (a Thread.critical= metódus használatával), akkor az ütemező egyetlen létező szálnak sem ad lehetőséget a futásra. Ellenben ez nem akadályozza meg ezután új szálak létrehozását, és valamint azok futását. Bizonyos szálműveletek (mint például egy szál leállítása, vagy kilövése, az aktuális szálon belüli várakoztatás (sleep), vagy egy kivétel kiváltása) újra ütemezetté teszik a szálat, még kritikus szekcióban is.

A Thread.critical= közvetlen használata természetesen lehetséges, de nem igazán szokványos. Szerencsére a Ruby számos alternatívával rendelkezik. Ezek közül a két legjobb: a Mutex osztály, valamint a ConditionVariable osztály a Thread könyvtármodulban elérhetőek.

A "Mutex" osztály

A Mutex osztály egy egyszerű szemafort valósít meg egy osztott erőforrás használatának kölcsönös kizárására. Azaz egyszerre legfeljebb egy szál rendelkezhet az zár felett. A többi szál vagy beáll egy várakozási sorba, amíg az zár szabaddá válik, vagy egyszerűen azonnali hibaüzenetet kap, jelezvén, hogy a zár nem elérhető.

A mutexeket általában akkor használjuk, amikor egy osztott adat frissítésének atominak kell lennie. Mondjuk, hogy frissítenünk kell két változót egy tranzakció részeként. Ezt egy triviális programban szimulálhatjuk néhány számláló növelésével. A frissítéseknek atominak kellene lenniük - a külvilágnak sosem szabadna látnia a számlálókat különböző értékekkel. Bármilyen mutex irányítás nélkül ez gyakorlatilag lehetetlen.

  count1 = count2 = 0
  difference = 0
  counter = Thread.new do
    loop do
      count1 += 1
      count2 += 1
    end
  end
  spy = Thread.new do
    loop do
    difference += (count1 - count2).abs
    end
  end
  sleep 1
  Thread.critical = 1
  count1
»  184846
  count2
»  184846
  difference
»  58126

Ez a példa azt mutatja, hogy a spy szál többször éledt fel, és azt vette észre, hogy a count1 és count2 szál inkonzisztens.

Szerencsére ezt kijavíthatjuk mutexek használatával.

require 'thread'
mutex = Mutex.new
  count1 = count2 = 0
  difference = 0
  counter = Thread.new do
    loop do
      mutex.synchronize do
        count1 += 1
        count2 += 1
      end
    end
  end
  spy = Thread.new do
    loop do
      mutex.synchronize do
        difference += (count1 - count2).abs
      end
    end
  end
  sleep 1
  mutex.lock
  count1
»  21192
  count2
»  21192
  difference
»  0

Az osztott adathoz való mindenféle hozzáférés mutex irányítása alá helyezésével biztosítjuk a konzisztenciát. Sajnálatos módon azonban, mint azt a számokból is leolvashatjuk, elég jelentős teljesítménybeli romlást is tapasztalhatunk.

Feltétel-változók ("ConditionVariable")

Néha egy kritikus adat mutex-szel való védelme nem elég. Tegyük fel, hogy egy kritikus szakaszban vagyunk, de várnunk kell bizonyos erőforrásra. Ha a szálunk elalszik várakozás közben, előfordulhat, hogy semelyik másik szál nem tudja felszabadítani az erőforrást, mert nem tud belépni a kritikus szakaszba - az eredeti szál még mindig blokkolja. Képesnek kell lennünk egy kritikus terület kizárólagos felhasználásának ideiglenes feladására, és eközben meg kell mondanunk másoknak, hogy egy adott erőforrásra várunk. Amikor pedig az erőforrás elérhetővé válik, képesnek kell lennünk egy lépésben megragadni azt, és újra megszereznünk a kritikus terület felhasználásának jogát.

Itt lép be feltétel-változó. Egy feltétel-változó egyszerűen egy erőforráshoz kapcsolódó szemafor, amely egy adott mutex védelme alatt áll. Amikor egy elérhetetlen erőforrásra van szükségünk, akkor egy feltétel-változón várakozunk. Ez felszabadítja a kapcsolódó mutex zárját. Amint egy másik szál jelzi, hogy az erőforrás felszabadult, az eredeti szál kilép a várakozásból, és ezzel egyidőben újra megszerzi a kritikus terület zárját.

  require 'thread'
  mutex = Mutex.new
  cv = ConditionVariable.new
  a = Thread.new {
    mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
    }
  }
  puts "(Later, back at the ranch...)"
  b = Thread.new {
    mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
    }
  }
  a.join
  b.join

Eredménye:

  A: I have critical section, but will wait for cv(Later, back at the ranch...)
  B: Now I am critical, but am done with cv
  B: I am still critical, finishing up
  A: I have critical section again! I rule!

A szinkronizációs mechanizmusok alternatív megoldásaihoz lásd a monitor.rb, és a sync.rb fájlokat a Ruby lib alkönyvtárban.

Több processz futtatása

Olykor egy feladatot processz méretű darabokra szeretnénk tördelni, vagy esetleg egy olyan processzt szeretnénk futtatni, amely nem Ruby-ban íródott. Nem probléma: a Ruby számos olyan eljárással rendelkezik, melyekkel különböző processzeket indíthatunk, és vezérelhetünk.

Új processz indítása

Több módon is létrehozhatunk különálló processzt. Ennek legkönnyebb módja, hogy elindítunk egy parancsot, és megvárjuk, és megvárjuk, míg befejeződik. Azon kaphatjuk magunkat eközben, hogy egy külön parancsot futtatunk, vagy adatot szeretnénk visszanyerni a rendszerből. A Ruby megteszi ezt nekünk a system és a ` metódusok segítségével.

system("tar xzf test.tgz") » tar: test.tgz: Cannot open: No such file or directory\ntar: Error is not recoverable: exiting now\ntar: Child returned status 2\ntar: Error exit delayed from previous errors\nfalse
  result = `date`
  result
»  "Sun Jun  9 00:08:50 CDT 2002\n"

A Kernel::system egy alprocesszben elindítja megadott parancsot, és true-val tér vissza, ha megtalálta, és helyesen le is futott, egyébként false-t ad vissza. Hiba esetén az alprocessz kilpési kódját a globális $? változóban találjuk.

A system metódussal az a probléma, hogy a kimenete ugyanoda megy, ahová az eredeti processzé, amit esetleg nem akarunk. Egy alprocessz kimenetét a ` metódus használatával kaphatjuk el, mint például az előző példában a `date` esetén. Ne felejtsük el, hogy a String#chomp használatával a sorvége karaktereket eltávolíthatjuk.

Oké, ez az egyszerű eseteket megoldja. Futtathatunk más processzeket, és megkaphatjuk az eredményt. De sokszor ennél több vezérlésre van szükségünk. Szeretnénk párbeszédet folytatni az alprocesszel, valószínüleg adatot is akarunk küldeni neki, és szeretnénk vissza is kapni. Az IO.open metódus pont ezt teszi. Az open egy parancsot alprocesszként futtat, és az adott alprocessz alapértelmezett be- és kimenetét egy IO objektumhoz köti. Írjunk az adott IO objektumra, és az alprocessz az alapértelmezett bemeneten tudja olvasni. Bármi, amit az alprocessz ír elérhető a Ruby programban az IO objektumból.

Például a rendszereinken az egyik leghasznosabb kellék a pig, egy program, amely szavakat olvas a bemenetről, és disznó Latinban írja ki. Ezt akkor használhatjuk, ha a Ruby programjainknak olyan üzenetet kell küldeniük, amit az 5 éves srácaink nem érthetnek meg.

  pig = IO.popen("pig", "w+")
  pig.puts "ice cream after they go to bed"
  pig.close_write
  puts pig.gets

Eredménye:

  iceway eamcray afterway eythay ogay otay edbay

Ezen a példán jól látható a az alprocesszek csatornákon (pipe) keresztüli vezérlésének látszólagos egyszerűsége, és a valós összetettsége. A kód elég egyszerűnek tűnik: megnyitunk egy csatornát, írunk rá egy kifejezést, és leolvassuk a választ. De kiderül, hogy a pig program nem üríti (flush) az általa írt kimenetet. Ebben a példában az eredeti kisérletünk, amiben egy pig.puts parancsot egy pig.gets követ, örökre felfüggesztve marad. A pig program feldolgozta a bemenetünket, de a válasza sosem került a csatornára. Be kellett illesztenünk a pig.close_write parancsot. Ez egy filevége üzenetet küld a pig bemenetére, és a keresett kimenet megjelenik, amint a pig terminál.

Van még egy csavar a popen-nel kapcsolatban. Ha az átadott parancs egy egyszerű minusz jel (--), a popen egy új Ruby interpretert indít (fork). Mindkettő a popen hívása utáni résznél folytatódik. Az eredeti processz egy IO objektumot kap vissza, míg a gyerek (child) egy nil-t kap.

  pipe = IO.popen("-","w+")
  if pipe
    pipe.puts "Get a job!"
    $stderr.puts "Child says '#{pipe.gets.chomp}'"
  else
    $stderr.puts "Dad says '#{gets.chomp}'"
    puts "OK"
  end

Eredménye:

  Dad says 'Get a job!'
  Child says 'OK'

A popen-nel kapcsolatban a hagyományos Unix hívás a Kernel::fork, a Kernel::exec, és az IO.pipe elérhetőek azon platformokon, amelyek támogatják őket. Egy konvencionális eljárás fájlok elnevezésére sok IO metódusban és a Kernel::open szintén alprocesszt indítanak ha egy | karaktert teszünk a fájl neve elé. Jegyezzük meg, hogy nem hozhatunk létre csatornát a File.new metódussal. Az csak fájlokra van.

Független gyerekek

Vannak alkalmak, amikor nincs szükségünk ilyen szoros felügyeletre: szeretnénk kiadni az alprocessznek a feladatot, és folytatni a saját dolgunk. Később benézünk, hogy végzett-e már. Például szeretnénk kirúgni egy sokáig tartó rendezést.

  exec("sort testfile > output.txt") if fork == nil
  # A rendezés most már a gyerek processzben fut
  # folytathatjuk a munkánkat
  # megvárjuk, míg befejeződik a rendezés
  Process.wait

A Kernel::fork egy processz azonosítóval (process id) tér vissza a szülő processzben, és nil-lel a gyerek folyamatban, amely így egy Kernel::exec hívást végrehajtva lefuttatja a rendezést. Kicsivel később kiadjuk a Process::wait utasítást, amely megvárja a rendezés befejezését (és visszaadja a processz azonosítóját).

Ha várakozás helyett inkább arról szeretnénk értesítést kapni, ha egy gyerek kilép, felállíthatunk egy Kernel::trap szignálkezelőt (signal handler). A következő példában egy csapdát (trap) állítunk a SIGCLD folyamatra, amely a gyerek folyamat terminálása eseményre állított szignál (death of child process).

  trap("CLD") {
    pid = Process.wait
    puts "Child pid #{pid}: terminated"
    exit
  }
  exec("sort testfile > output.txt") if fork == nil
  # csinálj valami mást...

Eredménye:

  Child pid 31842: terminated

Blokkok és alfolyamatok

Az IO.popen a File.open metódushoz hasonló módon egy blokkal dolgozik. Adjunk át egy parancsot a popen-nek, mint például a date, és a blokk egy IO objektumnak adódik át paraméterként.

  IO.popen ("date") { |f| puts "Date is #{f.gets}" }

Eredménye:

  Date is Sun Jun  9 00:08:50 CDT 2002

Az IO objektum automatikusan bezáródik, amikor a blokk kódja kilép, mint a File.open-nél.

Ha egy blokkot a Kernel::fork metódussal kötünk össze, akkor a blokk kódja egy Ruby alfolyamatként futtatódik, a szülő pedig a blokk utáni résszel folytatja a futását.

  fork do
    puts "In child, pid = #$$"
    exit 99
  end
  pid = Process.wait
  puts "Child terminated, pid = #{pid}, exit code = #{$? &bg;&bg; 8}"

Eredménye:

  In child, pid = 31849
  Child terminated, pid = 31849, exit code = 99

Még egy utolsó dolog. Miért csúsztatjuk egy a kilépési kódot a $? változóban 8 bittel jobbra megjelenítés előtt? Ez a Posix rendszerek jellemzője: az alsó 8 bitje a program terminálásának okát tartalmazza, míg a felső 8 bit az aktuális kilépési kódot.

< Előző oldalKövetkező oldal >