-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpip-get.py
84 lines (62 loc) · 2.26 KB
/
pip-get.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import sys
import time
from PyQt4 import QtCore, QtGui
from PyQt4.QtCore import pyqtSignal, pyqtSlot
import threading
def logthread(caller):
print('%-25s: %s, %s,' % (caller, threading.current_thread().name,
threading.current_thread().ident))
class MyApp(QtGui.QWidget):
def __init__(self, parent=None):
QtGui.QWidget.__init__(self, parent)
self.setGeometry(300, 300, 280, 600)
self.setWindowTitle('using threads')
self.layout = QtGui.QVBoxLayout(self)
self.testButton = QtGui.QPushButton("QThread")
self.testButton.released.connect(self.test)
self.listwidget = QtGui.QListWidget(self)
self.layout.addWidget(self.testButton)
self.layout.addWidget(self.listwidget)
self.threadPool = []
logthread('mainwin.__init__')
def add(self, text):
""" Add item to list widget """
logthread('mainwin.add')
self.listwidget.addItem(text)
self.listwidget.sortItems()
def addBatch(self, text="test", iters=6, delay=0.3):
""" Add several items to list widget """
logthread('mainwin.addBatch')
for i in range(iters):
time.sleep(delay) # artificial time delay
self.add(text+" "+str(i))
def test(self):
my_thread = QtCore.QThread()
my_thread.start()
# This causes my_worker.run() to eventually execute in my_thread:
my_worker = GenericWorker(self.addBatch)
my_worker.moveToThread(my_thread)
my_worker.start.emit("hello")
# my_worker.finished.connect(self.xxx)
self.threadPool.append(my_thread)
self.my_worker = my_worker
class GenericWorker(QtCore.QObject):
start = pyqtSignal(str)
finished = pyqtSignal()
def __init__(self, function, *args, **kwargs):
super(GenericWorker, self).__init__()
logthread('GenericWorker.__init__')
self.function = function
self.args = args
self.kwargs = kwargs
self.start.connect(self.run)
@pyqtSlot()
def run(self, *args, **kwargs):
logthread('GenericWorker.run')
self.function(*self.args, **self.kwargs)
self.finished.emit()
# run
app = QtGui.QApplication(sys.argv)
test = MyApp()
test.show()
app.exec_()