-
I am trying to get faster COPY FROM throughput with less memory wasting with psycopg2. The problem I basically face is a "tandem mode", where first python fills a BytesIO buffer, and in the second step So what I came up with is this thread construct, which writes bigger chunks once in a while into a pipe (plz ignore the table details, it is just to compare raw throughput with psycopg3): def threaded(cur, fr):
cur.copy_from(fr, 'temp_table', size=131072, columns=('f1','f2','f3','f4','f5','f6','f7','f8','f9','f10'))
def copy_insert2_new(cur, data):
cur.execute('CREATE TEMPORARY TABLE temp_table (pk serial, f1 int,f2 int,f3 int,f4 int,f5 int,f6 int,f7 int,f8 int,f9 int,f10 int)')
r, w = os.pipe()
fr = os.fdopen(r, "rb")
fw = os.fdopen(w, "wb")
t = threading.Thread(target=threaded, args=[cur, fr])
t.start()
counter = 0
lines = []
for o in data:
line = f'{o.f1}\t{o.f2}\t{o.f3}\t{o.f4}\t{o.f5}\t{o.f6}\t{o.f7}\t{o.f8}\t{o.f9}\t{o.f10}\n'.encode('utf-8')
lines.append(line)
counter += len(line)
if counter > 131072:
fw.write(b''.join(lines))
lines.clear()
counter = 0
if lines:
fw.write(b''.join(lines))
fw.close()
t.join()
fr.close()
cur.execute('DROP TABLE temp_table') In my tests this works as intended and runtime drops from ~3s to ~1.8s for 1M records fully saturating the postgres process (before it was at ~50%). Question: Edit: With proper pipe buffer alignment to 65536 bytes (default on linux) I get it down to ~1.75s (now python is at 110% CPU time). |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 17 replies
-
If you don't touch the cursor in the original thread you"SHOULD" be ok. But what stops you to create a separate cursor to pass to the thread? |
Beta Was this translation helpful? Give feedback.
-
For completeness, this is fastest I was able to find for psycopg2: def threaded_copy(cur, fr, tname, columns):
cur.copy_from(fr, tname, size=65536, columns=columns)
def copy_insert2(cur, data):
# TODO: to be set from args
tname = 'temp_table'
columns = ('f1','f2','f3','f4','f5','f6','f7','f8','f9','f10')
cur.execute(f'CREATE TEMPORARY TABLE {tname} (pk serial, f1 int,f2 int,f3 int,f4 int,f5 int,f6 int,f7 int,f8 int,f9 int,f10 int)')
use_thread = False
payload = bytearray()
for o in data:
# TODO: make line formatter cumstomizable
payload += f'{o.f1}\t{o.f2}\t{o.f3}\t{o.f4}\t{o.f5}\t{o.f6}\t{o.f7}\t{o.f8}\t{o.f9}\t{o.f10}\n'.encode('utf-8')
if len(payload) > 65535:
# if we exceed 64k, switch to threaded chunkwise processing
if not use_thread:
r, w = os.pipe()
fr = os.fdopen(r, 'rb')
fw = os.fdopen(w, 'wb')
t = threading.Thread(target=threaded_copy, args=[cur.connection.cursor(), fr, tname, columns])
t.start()
use_thread = True
length = len(payload)
m = memoryview(payload)
pos = 0
while length - pos > 65535:
# write all full 64k chunks (in case some line payload went overboard)
fw.write(m[pos:pos+65536])
pos += 65536
# carry remaining data forward
payload = bytearray(m[pos:])
if use_thread:
if payload:
fw.write(payload)
fw.close()
t.join()
fr.close()
elif payload:
f = BytesIO(payload)
cur.copy_from(f, tname, size=65536, columns=columns)
f.close()
cur.execute(f'DROP TABLE {tname}') and runtime drops to 1.65s for 1M records with 4 digit numbers (~50MB payload). This is only slightly above python's line formatting speed, which clearly is the limiting factor (line formatting alone takes ~1.50s). The optional threading keeps things snappy for lowish payload, and saves memory and runtime for big payloads (~1.8 times faster). @dvarrazzo |
Beta Was this translation helpful? Give feedback.
-
Using a decorator you are creating only a mechanism configured at import time. Custom types should be managed at connection scope because extension types get different OIDs in different databases. That's why loaders/dumpers registration take a context as parameter in Psycopg 3. In psycopg2 the parameter is only available on loaders, not on dumpers, which sort of worked anyway because psycopg2 dumpers don't have to (and cannot) specify OIDs.
Nulls in arrays and Json are different things. Arrays in psycopg2 are another of those things that will not work in COPY, as they use the
That's the
There is no guarantee whatsoever that str(obj) creates a valid Postgres representation of the type, apart from the security concerns. |
Beta Was this translation helpful? Give feedback.
-
Some final conclusions from my speed tests: The C version of psycopg3's Secondly @dvarrazzo |
Beta Was this translation helpful? Give feedback.
If you don't touch the cursor in the original thread you"SHOULD" be ok. But what stops you to create a separate cursor to pass to the thread?