Witaj Gościu! ( Zaloguj | Rejestruj )

Forum PHP.pl

> Kolejka w PHP,
Prph
post
Post #1





Grupa: Zarejestrowani
Postów: 338
Pomógł: 2
Dołączył: 4.03.2006
Skąd: Łódź

Ostrzeżenie: (0%)
-----


Dawno mnie tu nie było (IMG:style_emoticons/default/smile.gif)

Forumowicze, problem z kolejką w PHP. System, który tworzę przetwarza dane co minutę (z CRONa). Niestety uruchomienie pojedynczego procesu np, przetwarzaj.php nie kończy się w 1 minucie, ponieważ przetwarzanie danych może trwać np. 2 mninuty dla pojedynczego rekordu bazy danych, a rekordów jest np. 20. Teoretycznie szeregowe uruchomienie przetwarzania zajmie więc 40 min. Każdy przetwarzaj.pho bierze z bazy zatem 1 rekord i tylko taki mieli.

Moje pytanie: w jaki sposób napisać kolejkę, która poprawnie przetworzy takie dane? Wywołać z crona 20x ten sam przetwarzaj.php? Zanim proces zacznie wykonywać swoją pracę, ustawia na rekordzie bazy, że jest on przetwarzany, więc inny proces weźmie kolejne dane.

Spotkaliście się z podobnym problemem, macie pomysły?
Go to the top of the page
+Quote Post
 
Start new topic
Odpowiedzi
Crozin
post
Post #2





Grupa: Zarejestrowani
Postów: 6 476
Pomógł: 1306
Dołączył: 6.08.2006
Skąd: Kraków

Ostrzeżenie: (0%)
-----


Aż mnie coś wzięło, żeby zobaczyć jak można zrobić to "normalnie" (czyt: bez forkowania procesów, a z użyciem normalnych wątków):
  1. package com.crozin.concurrent;
  2.  
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.LinkedBlockingQueue;
  5.  
  6. public class RecordsProcessor implements Runnable {
  7. private static final int NUMBER_OF_PROCESSORS = 8;
  8.  
  9. private final BlockingQueue<Integer> processingQueue = new LinkedBlockingQueue<Integer>();
  10.  
  11. private class ProcessorThread extends Thread {
  12. public ProcessorThread(String name) {
  13. super(name);
  14. }
  15.  
  16. @Override
  17. public void run() {
  18. try {
  19. while (true) {
  20. Integer record = processingQueue.take();
  21.  
  22. log("Processing record #" + record);
  23.  
  24. Thread.sleep(new java.util.Random().nextInt(1000) + 1000);
  25. }
  26. } catch (InterruptedException e) {
  27. return;
  28. }
  29. }
  30. }
  31.  
  32. @Override
  33. public void run() {
  34. for (int i = 0; i < NUMBER_OF_PROCESSORS; i++) {
  35. new ProcessorThread("Processor #" + i).start();
  36. }
  37.  
  38. int c = 1;
  39.  
  40. while (true) {
  41. if (processingQueue.size() <= 10) {
  42. log("Refilling the queue");
  43.  
  44. for (int i = 0; i < 100; i++) {
  45. processingQueue.add(c++);
  46. }
  47. }
  48.  
  49. try {
  50. Thread.sleep(2000);
  51. } catch (InterruptedException e) {
  52. return;
  53. }
  54. }
  55. }
  56.  
  57. public static void main(String[] args) throws Exception {
  58. new RecordsProcessor().run();
  59. }
  60.  
  61. public static void log(String msg) {
  62. System.out.println(Thread.currentThread().getName() + ": " + msg);
  63. }
  64. }
Minusy:
- konieczność zrezygnowania z PHP dla tego elementu systemu.

Plusy:
- będzie działać na każdej platformie,
- gwarantuje, że rekordy zostaną przetworzone w odpowiedniej kolejności oraz tylko i wyłącznie jeden raz,
- brak niepotrzebnych przerw w działaniu,
- kontrola nad ilością przetwarzanych rekordów w danym momencie,
- łatwiej to wszystko ogarnąć, wprowadzać zmiany itp.
Go to the top of the page
+Quote Post

Posty w temacie


Reply to this topicStart new topic
2 Użytkowników czyta ten temat (2 Gości i 0 Anonimowych użytkowników)
0 Zarejestrowanych:

 



RSS Aktualny czas: 6.10.2025 - 15:43