1 """
2 A simple parallel processing API for Python, inspired somewhat by the thread
3 module, slightly less by pypar, and slightly less still by pypvm.
4
5 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk>
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by the Free
9 Software Foundation; either version 3 of the License, or (at your option) any
10 later version.
11
12 This program is distributed in the hope that it will be useful, but WITHOUT
13 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 details.
16
17 You should have received a copy of the GNU Lesser General Public License along
18 with this program. If not, see <http://www.gnu.org/licenses/>.
19 """
20
21 __version__ = "0.5"
22
23 import os
24 import sys
25 import select
26 import socket
27 import platform
28
29 try:
30 import cPickle as pickle
31 except ImportError:
32 import pickle
33
34 try:
35 set
36 except NameError:
37 from sets import Set as set
38
39
40
42
43
44
47
49
50 "A communications channel."
51
52 - def __init__(self, pid, read_pipe, write_pipe):
53
54 """
55 Initialise the channel with a process identifier 'pid', a 'read_pipe'
56 from which messages will be received, and a 'write_pipe' into which
57 messages will be sent.
58 """
59
60 self.pid = pid
61 self.read_pipe = read_pipe
62 self.write_pipe = write_pipe
63
65
66
67
68
69 self.close()
70
72
73 "Explicitly close the channel."
74
75 if self.read_pipe is not None:
76 self.read_pipe.close()
77 self.read_pipe = None
78 if self.write_pipe is not None:
79 self.write_pipe.close()
80 self.write_pipe = None
81
82
83 - def wait(self, options=0):
84
85 "Wait for the created process, if any, to exit."
86
87 if self.pid != 0:
88 try:
89 os.waitpid(self.pid, options)
90 except OSError:
91 pass
92
94
95 "Send the given object 'obj' through the channel."
96
97 pickle.dump(obj, self.write_pipe)
98 self.write_pipe.flush()
99
100 - def send(self, obj):
101
102 """
103 Send the given object 'obj' through the channel. Then wait for an
104 acknowledgement. (The acknowledgement makes the caller wait, thus
105 preventing processes from exiting and disrupting the communications
106 channel and losing data.)
107 """
108
109 self._send(obj)
110 if self._receive() != "OK":
111 raise AcknowledgementError, obj
112
114
115 "Receive an object through the channel, returning the object."
116
117 obj = pickle.load(self.read_pipe)
118 if isinstance(obj, Exception):
119 raise obj
120 else:
121 return obj
122
124
125 """
126 Receive an object through the channel, returning the object. Send an
127 acknowledgement of receipt. (The acknowledgement makes the sender wait,
128 thus preventing processes from exiting and disrupting the communications
129 channel and losing data.)
130 """
131
132 try:
133 obj = self._receive()
134 return obj
135 finally:
136 self._send("OK")
137
139
140 """
141 A persistent communications channel which can handle peer disconnection,
142 acting as a server, meaning that this channel is associated with a specific
143 address which can be contacted by other processes.
144 """
145
146 - def __init__(self, pid, endpoint, address):
147 Channel.__init__(self, pid, None, None)
148 self.endpoint = endpoint
149 self.address = address
150 self.poller = select.poll()
151
152
153
154
155
156 self.endpoint.listen(1)
157
159
160 "Close the persistent channel and remove the socket file."
161
162 Channel.close(self)
163 try:
164 os.unlink(self.address)
165 except OSError:
166 pass
167
169
170 "Ensure that the channel is capable of communicating."
171
172 if self.read_pipe is None or self.write_pipe is None:
173
174
175
176 endpoint, address = self.endpoint.accept()
177 self.read_pipe = endpoint.makefile("r", 0)
178 self.write_pipe = endpoint.makefile("w", 0)
179
180
181
182 fileno = self.write_pipe.fileno()
183 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
184
186
187 "Discard the existing connection."
188
189 fileno = self.write_pipe.fileno()
190 self.poller.unregister(fileno)
191 self.read_pipe = None
192 self.write_pipe = None
193 self.endpoint.listen(1)
194
196
197 "Ensure that sending and receiving are possible."
198
199 while 1:
200 self._ensure_pipes()
201 fileno = self.write_pipe.fileno()
202 fds = self.poller.poll(timeout)
203 for fd, status in fds:
204 if fd != fileno:
205 continue
206 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
207
208
209
210 self._reset_pipes()
211 break
212 else:
213 return
214
221
228
229
230
232
233 """
234 A communications exchange that can be used to detect channels which are
235 ready to communicate. Subclasses of this class can define the 'store_data'
236 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
237
238 Once exchanges are populated with active channels, use of the principal
239 methods of the exchange typically cause the 'store' method to be invoked,
240 resulting in the processing of any incoming data.
241 """
242
243 - def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
244
245 """
246 Initialise the exchange with an optional list of 'channels'.
247
248 If the optional 'limit' is specified, restrictions on the addition of
249 new channels can be enforced and observed through the 'add_wait', 'wait'
250 and 'finish' methods. To make use of these methods, create a subclass of
251 this class and define a working 'store_data' method.
252
253 If the optional 'reuse' parameter is set to a true value, channels and
254 processes will be reused for waiting computations, but the callable will
255 be invoked for each computation.
256
257 If the optional 'continuous' parameter is set to a true value, channels
258 and processes will be retained after receiving data sent from such
259 processes, since it will be assumed that they will communicate more
260 data.
261
262 If the optional 'autoclose' parameter is set to a false value, channels
263 will not be closed automatically when they are removed from the exchange
264 - by default they are closed when removed.
265 """
266
267 self.limit = limit
268 self.reuse = reuse
269 self.autoclose = autoclose
270 self.continuous = continuous
271
272 self.waiting = []
273 self.readables = {}
274 self.removed = []
275 self.poller = select.poll()
276
277 for channel in channels or []:
278 self.add(channel)
279
280
281
282 - def add(self, channel):
283
284 "Add the given 'channel' to the exchange."
285
286 fileno = channel.read_pipe.fileno()
287 self.readables[fileno] = channel
288 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
289
291
292 "Return a list of active channels."
293
294 return self.readables.values()
295
296 - def ready(self, timeout=None):
297
298 """
299 Wait for a period of time specified by the optional 'timeout' in
300 milliseconds (or until communication is possible) and return a list of
301 channels which are ready to be read from.
302 """
303
304 fds = self.poller.poll(timeout)
305 readables = []
306 self.removed = []
307
308 for fd, status in fds:
309 channel = self.readables[fd]
310 removed = 0
311
312
313
314 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
315 self.remove(channel)
316 self.removed.append(channel)
317 removed = 1
318
319
320
321 if status & select.POLLIN:
322 if not (removed and self.autoclose):
323 readables.append(channel)
324
325 return readables
326
328
329 """
330 Remove the given 'channel' from the exchange.
331 """
332
333 fileno = channel.read_pipe.fileno()
334 del self.readables[fileno]
335 self.poller.unregister(fileno)
336 if self.autoclose:
337 channel.close()
338 channel.wait()
339
340
341
343
344 "Return whether the exchange still has work scheduled or in progress."
345
346 return self.active() or self.waiting
347
349
350 "Return whether the exchange uses as many channels as it is allowed to."
351
352 return self.limit is not None and len(self.active()) >= self.limit
353
355
356 """
357 Add the given 'channel' to the exchange, waiting if the limit on active
358 channels would be exceeded by adding the channel.
359 """
360
361 self.wait()
362 self.add(channel)
363
365
366 """
367 Test for the limit on channels, blocking and reading incoming data until
368 the number of channels is below the limit.
369 """
370
371
372
373 while self.busy():
374 self.store()
375
377
378 """
379 Finish the use of the exchange by waiting for all channels to complete.
380 """
381
382 while self.unfinished():
383 self.store()
384
385 - def store(self, timeout=None):
386
387 """
388 For each ready channel, process the incoming data. If the optional
389 'timeout' parameter (a duration in milliseconds) is specified, wait only
390 for the specified duration if no channels are ready to provide data.
391 """
392
393
394
395 if self.active():
396 for channel in self.ready(timeout):
397 self.store_data(channel)
398 self.start_waiting(channel)
399
400
401
402 else:
403 while self.waiting and not self.busy():
404 callable, args, kw = self.waiting.pop()
405 self.start(callable, *args, **kw)
406
408
409 """
410 Store incoming data from the specified 'channel'. In subclasses of this
411 class, such data could be stored using instance attributes.
412 """
413
414 raise NotImplementedError, "store_data"
415
416
417
419
420 """
421 Get waiting callable and argument information for new processes, given
422 the reception of data on the given 'channel'.
423 """
424
425
426
427 if self.waiting and not self.continuous:
428
429
430
431 callable, args, kw = self.waiting.pop()
432
433
434
435 if self.reuse:
436
437
438
439
440 self.add(channel)
441 channel.send((args, kw))
442
443 else:
444 return callable, args, kw
445
446
447
448
449 elif self.reuse:
450 channel.send(None)
451
452 return None
453
455
456 """
457 Support process creation by returning whether the given 'callable' has
458 been queued for later invocation.
459 """
460
461 if self.busy():
462 self.waiting.insert(0, (callable, args, kw))
463 return 1
464 else:
465 return 0
466
468
469 """
470 Support process creation by returning the given 'channel' to the
471 creating process, and None to the created process.
472 """
473
474 if channel.pid == 0:
475 return channel
476 else:
477 self.add_wait(channel)
478 return None
479
480
481
483
484 """
485 Start a waiting process given the reception of data on the given
486 'channel'.
487 """
488
489 details = self._get_waiting(channel)
490 if details is not None:
491 callable, args, kw = details
492 self.add(start(callable, *args, **kw))
493
494
495
496 - def start(self, callable, *args, **kw):
497
498 """
499 Create a new process for the given 'callable' using any additional
500 arguments provided. Then, monitor the channel created between this
501 process and the created process.
502 """
503
504 if self._set_waiting(callable, args, kw):
505 return
506
507 self.add_wait(start(callable, *args, **kw))
508
510
511 """
512 Create a new process and return the created communications channel to
513 the created process. In the creating process, return None - the channel
514 receiving data from the created process will be automatically managed by
515 this exchange.
516 """
517
518 channel = create()
519 return self._get_channel_for_process(channel)
520
522
523 """
524 Wrap the given 'callable' in an object which can then be called in the
525 same way as 'callable', but with new processes and communications
526 managed automatically.
527 """
528
529 return ManagedCallable(callable, self)
530
532
533 """
534 A mix-in class providing methods to exchanges for the management of
535 persistent communications.
536 """
537
539
540 """
541 Start a waiting process given the reception of data on the given
542 'channel'.
543 """
544
545 details = self._get_waiting(channel)
546 if details is not None:
547 callable, args, kw = details
548 self.add(start_persistent(channel.address, callable, *args, **kw))
549
550 - def start(self, address, callable, *args, **kw):
551
552 """
553 Create a new process, located at the given 'address', for the given
554 'callable' using any additional arguments provided. Then, monitor the
555 channel created between this process and the created process.
556 """
557
558 if self._set_waiting(callable, args, kw):
559 return
560
561 start_persistent(address, callable, *args, **kw)
562
564
565 """
566 Create a new process, located at the given 'address', and return the
567 created communications channel to the created process. In the creating
568 process, return None - the channel receiving data from the created
569 process will be automatically managed by this exchange.
570 """
571
572 channel = create_persistent(address)
573 return self._get_channel_for_process(channel)
574
575 - def manage(self, address, callable):
576
577 """
578 Using the given 'address', publish the given 'callable' in an object
579 which can then be called in the same way as 'callable', but with new
580 processes and communications managed automatically.
581 """
582
583 return PersistentCallable(address, callable, self)
584
586
587 "Connect to a process which is contactable via the given 'address'."
588
589 channel = connect_persistent(address)
590 self.add_wait(channel)
591
593
594 "A callable managed by an exchange."
595
596 - def __init__(self, callable, exchange):
597
598 """
599 Wrap the given 'callable', using the given 'exchange' to monitor the
600 channels created for communications between this and the created
601 processes. Note that the 'callable' must be parallel-aware (that is,
602 have a 'channel' parameter). Use the MakeParallel class to wrap other
603 kinds of callable objects.
604 """
605
606 self.callable = callable
607 self.exchange = exchange
608
610
611 "Invoke the callable with the supplied arguments."
612
613 self.exchange.start(self.callable, *args, **kw)
614
616
617 "A callable which sets up a persistent communications channel."
618
619 - def __init__(self, address, callable, exchange):
620
621 """
622 Using the given 'address', wrap the given 'callable', using the given
623 'exchange' to monitor the channels created for communications between
624 this and the created processes, so that when it is called, a background
625 process is started within which the 'callable' will run. Note that the
626 'callable' must be parallel-aware (that is, have a 'channel' parameter).
627 Use the MakeParallel class to wrap other kinds of callable objects.
628 """
629
630 self.callable = callable
631 self.exchange = exchange
632 self.address = address
633
635
636 "Invoke the callable with the supplied arguments."
637
638 self.exchange.start(self.address, self.callable, *args, **kw)
639
641
642 """
643 A callable which sets up a persistent communications channel, but is
644 unmanaged by an exchange.
645 """
646
648
649 """
650 Using the given 'address', wrap the given 'callable'. This object can
651 then be invoked, but the wrapped callable will be run in a background
652 process. Note that the 'callable' must be parallel-aware (that is, have
653 a 'channel' parameter). Use the MakeParallel class to wrap other kinds
654 of callable objects.
655 """
656
657 self.callable = callable
658 self.address = address
659
661
662 "Invoke the callable with the supplied arguments."
663
664 start_persistent(self.address, self.callable, *args, **kw)
665
666
667
668 -class Map(Exchange):
669
670 "An exchange which can be used like the built-in 'map' function."
671
675
677
678 "Remember the channel addition order to order output."
679
680 self.channel_number = 0
681 self.channels = {}
682 self.results = []
683 self.current_index = 0
684
685 - def add(self, channel):
686
687 "Add the given 'channel' to the exchange."
688
689 Exchange.add(self, channel)
690 self.channels[channel] = self.channel_number
691 self.channel_number += 1
692
693 - def start(self, callable, *args, **kw):
694
695 """
696 Create a new process for the given 'callable' using any additional
697 arguments provided. Then, monitor the channel created between this
698 process and the created process.
699 """
700
701 self.results.append(Undefined)
702 Exchange.start(self, callable, *args, **kw)
703
705
706 """
707 Create a new process and return the created communications channel to
708 the created process. In the creating process, return None - the channel
709 receiving data from the created process will be automatically managed by
710 this exchange.
711 """
712
713 self.results.append(Undefined)
714 return Exchange.create(self)
715
716 - def __call__(self, callable, sequence):
717
718 "Wrap and invoke 'callable' for each element in the 'sequence'."
719
720 if not isinstance(callable, MakeParallel):
721 wrapped = MakeParallel(callable)
722 else:
723 wrapped = callable
724
725 self.init()
726
727
728
729 for i in sequence:
730 self.start(wrapped, i)
731
732
733
734 return self
735
737
738 "Accumulate the incoming data, associating results with channels."
739
740 data = channel.receive()
741 self.results[self.channels[channel]] = data
742 del self.channels[channel]
743
746
748
749 "Return the next element in the map."
750
751 try:
752 return self._next()
753 except IndexError:
754 pass
755
756 while self.unfinished():
757 self.store()
758 try:
759 return self._next()
760 except IndexError:
761 pass
762 else:
763 raise StopIteration
764
766
767 "Return element 'i' from the map."
768
769 try:
770 return self._get(i)
771 except IndexError:
772 pass
773
774 while self.unfinished():
775 self.store()
776 try:
777 return self._get(i)
778 except IndexError:
779 pass
780 else:
781 raise IndexError, i
782
783
784
786 result = self._get(self.current_index)
787 self.current_index += 1
788 return result
789
791 result = self.results[i]
792 if result is Undefined or isinstance(i, slice) and Undefined in result:
793 raise IndexError, i
794 return result
795
797
798 """
799 An exchange acting as a queue, making data from created processes available
800 in the order in which it is received.
801 """
802
806
808
809 "Accumulate the incoming data, associating results with channels."
810
811 data = channel.receive()
812 self.queue.insert(0, data)
813
816
818
819 "Return the next element in the queue."
820
821 if self.queue:
822 return self.queue.pop()
823
824 while self.unfinished():
825 self.store()
826 if self.queue:
827 return self.queue.pop()
828 else:
829 raise StopIteration
830
832
833 "Return the current length of the queue."
834
835 return len(self.queue)
836
838
839 "A wrapper around functions making them able to communicate results."
840
842
843 """
844 Initialise the wrapper with the given 'callable'. This object will then
845 be able to accept a 'channel' parameter when invoked, and to forward the
846 result of the given 'callable' via the channel provided back to the
847 invoking process.
848 """
849
850 self.callable = callable
851
852 - def __call__(self, channel, *args, **kw):
853
854 "Invoke the callable and return its result via the given 'channel'."
855
856 channel.send(self.callable(*args, **kw))
857
859
860 """
861 A wrapper around functions making them able to communicate results in a
862 reusable fashion.
863 """
864
865 - def __call__(self, channel, *args, **kw):
866
867 "Invoke the callable and return its result via the given 'channel'."
868
869 channel.send(self.callable(*args, **kw))
870 t = channel.receive()
871 while t is not None:
872 args, kw = t
873 channel.send(self.callable(*args, **kw))
874 t = channel.receive()
875
876
877
879
880 "An exchange which manages persistent communications."
881
882 pass
883
885
886 "A queue which manages persistent communications."
887
888 pass
889
890
891
893
894 """
895 Connect to a process reachable via the given 'address', making the results
896 of which accessible via a queue.
897 """
898
899 queue = PersistentQueue(limit=1)
900 queue.connect(address)
901 return queue
902
903 -def pmap(callable, sequence, limit=None):
904
905 """
906 A parallel version of the built-in map function with an optional process
907 'limit'. The given 'callable' should not be parallel-aware (that is, have a
908 'channel' parameter) since it will be wrapped for parallel communications
909 before being invoked.
910
911 Return the processed 'sequence' where each element in the sequence is
912 processed by a different process.
913 """
914
915 mymap = Map(limit=limit)
916 return mymap(callable, sequence)
917
918
919
920 _cpuinfo_fields = "processor", "physical id", "core id"
921
923
924 """
925 Return the number of distinct, genuine processor cores. If the platform is
926 not supported by this function, None is returned.
927 """
928
929 try:
930 f = open("/proc/cpuinfo")
931 try:
932 processors = set()
933
934
935
936
937 processor = [None, None, None]
938
939 for line in f.xreadlines():
940 for i, field in enumerate(_cpuinfo_fields):
941
942
943
944
945 if line.startswith(field):
946 t = line.split(":")
947 processor[i] = int(t[1].strip())
948 break
949
950
951
952
953 if line.startswith("processor") and processor[0] is not None:
954 processors.add(tuple(processor))
955 processor = [None, None, None]
956
957
958
959 if processor[0] is not None:
960 processors.add(tuple(processor))
961
962 return len(processors)
963
964 finally:
965 f.close()
966
967 except OSError:
968 return None
969
971
972 """
973 Return the number of cores for OpenSolaris 2008.05 and possibly other
974 editions of Solaris.
975 """
976
977 f = os.popen("psrinfo -p")
978 try:
979 return int(f.read().strip())
980 finally:
981 f.close()
982
983
984
986
987 """
988 Create a new process, returning a communications channel to both the
989 creating process and the created process.
990 """
991
992 parent, child = socket.socketpair()
993 for s in [parent, child]:
994 s.setblocking(1)
995
996 pid = os.fork()
997 if pid == 0:
998 parent.close()
999 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
1000 else:
1001 child.close()
1002 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1003
1005
1006 """
1007 Create a new process, returning a communications channel to both the
1008 creating process and the created process.
1009
1010 This function uses pipes instead of a socket pair, since some platforms
1011 seem to have problems with poll and such socket pairs.
1012 """
1013
1014 pr, cw = os.pipe()
1015 cr, pw = os.pipe()
1016
1017 pid = os.fork()
1018 if pid == 0:
1019 os.close(pr)
1020 os.close(pw)
1021 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0))
1022 else:
1023 os.close(cr)
1024 os.close(cw)
1025 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
1026
1027 if platform.system() == "SunOS":
1028 create = create_pipes
1029 get_number_of_cores = _get_number_of_cores_solaris
1030 else:
1031 create = create_socketpair
1032 get_number_of_cores = _get_number_of_cores
1033
1035
1036 """
1037 Create a new process, returning a persistent communications channel between
1038 the creating process and the created process. This channel can be
1039 disconnected from the creating process and connected to another process, and
1040 thus can be used to collect results from daemon processes.
1041
1042 In order to be able to reconnect to created processes, the 'address' of the
1043 communications endpoint for the created process needs to be provided. This
1044 should be a filename.
1045 """
1046
1047 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1048 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1049 child.bind(address)
1050
1051 for s in [parent, child]:
1052 s.setblocking(1)
1053
1054 pid = os.fork()
1055 if pid == 0:
1056 parent.close()
1057 return PersistentChannel(pid, child, address)
1058 else:
1059 child.close()
1060
1061 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1062
1064
1065 """
1066 Connect via a persistent channel to an existing created process, reachable
1067 at the given 'address'.
1068 """
1069
1070 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1071 parent.setblocking(1)
1072 parent.connect(address)
1073 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
1074
1076
1077 """
1078 Terminate a created process, closing the given 'channel'.
1079 """
1080
1081 channel.close()
1082 os._exit(0)
1083
1084 -def start(callable, *args, **kw):
1085
1086 """
1087 Create a new process which shall start running in the given 'callable'.
1088 Additional arguments to the 'callable' can be given as additional arguments
1089 to this function.
1090
1091 Return a communications channel to the creating process. For the created
1092 process, supply a channel as the 'channel' parameter in the given 'callable'
1093 so that it may send data back to the creating process.
1094 """
1095
1096 channel = create()
1097 if channel.pid == 0:
1098 try:
1099 try:
1100 callable(channel, *args, **kw)
1101 except:
1102 exc_type, exc_value, exc_traceback = sys.exc_info()
1103 channel.send(exc_value)
1104 finally:
1105 exit(channel)
1106 else:
1107 return channel
1108
1110
1111 """
1112 Create a new process which shall be reachable using the given 'address' and
1113 which will start running in the given 'callable'. Additional arguments to
1114 the 'callable' can be given as additional arguments to this function.
1115
1116 Return a communications channel to the creating process. For the created
1117 process, supply a channel as the 'channel' parameter in the given 'callable'
1118 so that it may send data back to the creating process.
1119
1120 Note that the created process employs a channel which is persistent: it can
1121 withstand disconnection from the creating process and subsequent connections
1122 from other processes.
1123 """
1124
1125 channel = create_persistent(address)
1126 if channel.pid == 0:
1127 close_streams()
1128 try:
1129 try:
1130 callable(channel, *args, **kw)
1131 except:
1132 exc_type, exc_value, exc_traceback = sys.exc_info()
1133 channel.send(exc_value)
1134 finally:
1135 exit(channel)
1136 else:
1137 return channel
1138
1140
1141 """
1142 Close streams which keep the current process attached to any creating
1143 processes.
1144 """
1145
1146 os.close(sys.stdin.fileno())
1147 os.close(sys.stdout.fileno())
1148 os.close(sys.stderr.fileno())
1149
1151
1152 "Wait for all created processes to terminate."
1153
1154 try:
1155 while 1:
1156 os.wait()
1157 except OSError:
1158 pass
1159
1160
1161