Pages

Showing posts with label fast insert. Show all posts
Showing posts with label fast insert. Show all posts

Friday, February 1, 2019

Fast import of billion lines to postgresql

This is the fastest way I've found about importing huge (3+ billion lines) text files in csv-like format in PostgreSQL. It requires python3 and psycopg2

#!/usr/bin/python3
#
# initial stuff by Cac Ko <email@gdpr_protected_stuff> 
# additional by <geroy@horizon9.org>
#
# import very large cvs-like file in postgre database
# 01.02.2019
#
# requires psycopg2 postgresql lib for python
# file format is:
# ----------------------------
# something12:otherthing2345
# something7134:otherthing4243
# .....
# License: Public Domain

import os
import sys
import psycopg2
from psycopg2.extras import execute_values

# use file in tmpfs for faster read/write
POS_FILE = '/run/.position'

# get current position from POS_FILE location
def getPosition():
    result = 0
    if os.path.exists(POS_FILE):
        with open(POS_FILE, 'rb') as f:
            try:
                result = int(f.read())
                print("result = %d" % result)
            except Exception as error: 
                print("Error read():", error)
                result = 0
    return result

# write to this file after each commit()
def storePosition(pos):
    with open(POS_FILE, 'w') as f:
        f.write(str(pos))

# uncomment the following 3 lines if you want perentage / position on stdout
#    y = pos/whole*100
#    print('{0:.2f}%'.format(y), end = '')
#    print(" pos=%s" % (str(pos)))


# store stuff in database
def storeInDb(line, bulk_data):
#    print(line)
    x = []
    x = line.split(':')
    sql = b'INSERT into secrets (somestuff1, somestuff2) VALUES %s'
    try:
        execute_values(cur, sql, bulk_data)
        connection.commit()
    except Exception as error:
        print("Error pri INSERT", error)
        pass

if __name__ == '__main__':
    try:
        if len(sys.argv) < 2:
            print('usage: ./large-file-import.py filename-to-import.csv')
            exit
        f = open(sys.argv[1], 'r', buffering=2000000, errors='replace')
        whole = os.path.getsize(sys.argv[1])
        pos = getPosition()
        print("Start reading from pos=%d" % pos)
        f.seek(pos)
        line = f.readline()
        try:
            bulk_count = 0;
            connection = psycopg2.connect(
                user="postgres", password="somestuff", host="127.0.0.1", port="5433",
database="postgres") cur = connection.cursor() bulk_data = [] split_line = [] while line: split_line = line.split(':') try: l = tuple(split_line) bulk_data.append(l) except (Exception) as error: print(error) pass storePosition(f.tell()) line = f.readline() bulk_count = bulk_count + 1 # store and commit in db after X lines if bulk_count == 9000: storeInDb(line,bulk_data) bulk_count = 0; bulk_data = [] #connection.commit() except (Exception, psycopg2.Error) as error: print("Error while connecting to PostgreSQL", error) finally: if(connection): cur.close() connection.close() print("PostgreSQL connection is closed") except KeyboardInterrupt: storePosition(f.tell()) if(connection): cur.close() connection.close() print("PostgreSQL connection is closed")
I am too lazy to explain how this script works and probably there are some errors but it works for me.