By: Hristo Hristov | Updated: 2023-01-03 | Comments (2) | Related: > Python
Problem
If you are working with time series data with high frequency or some recording of a recurring event, then your dataset might grow tremendously over time. The overall size could reach a total row count of tens or even hundreds of millions. What options do you have as a data professional to export such a dataset to one or multiple csv files?
Solution
Using Python, we can connect to an MSSQL database and perform DML operations. Then we can export the in-memory results. However, the challenge here is the limited amount of memory we have defined by the machine and the environment settings. We will show how a typical go-to solution will fail due to this limitation. Then we will show a solution that works, using Python and the pyodbc and dask libraries.
Solution Environment
I have created a database in Azure called TestDB with a single table UserAccessLogs. The table has been created with the following script:
CREATE TABLE [dbo].[UserAccessLogs]( [RowId] [int] IDENTITY(1,1) NOT NULL, [IP] [varchar](20) NOT NULL, [UserId] [varchar](20) NOT NULL, [Timestamp] [datetime] NULL ) ON [PRIMARY] GO ALTER TABLE [dbo].[UserAccessLogs] ADD PRIMARY KEY CLUSTERED ( [RowId] ASC )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] GO
The data were generated artificially and represent a basic user access log with an IP address, user id and timestamp. Here is a sample:
Additionally, I have created a Jupyter notebook where I will run this experiment. I am running Python 3.10 in a conda virtual environment. First, I will do some imports and set some variables to connect to our database in Azure:
With these variables ready, we can try importing our data.
Attempt 1 with pandas
We will use pyodbc to establish a connection to the SQL server and query the
table. Then, we can use a pandas dataframe object to store the results of our query.
First, let us find out (programmatically) the total amount of rows we are dealing
with. We will do this by using the with
keyword to
establish a context manager for our database connection. Inside that context, we
will execute a simple query with COUNT()
:
total_rows = 0 with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password) as conn: with conn.cursor() as cursor: cursor.execute('SELECT COUNT(*) FROM UserAccessLogs') row = cursor.fetchone() total_rows = int(row[0]) print(total_rows)
The result is 81 million rows:
The next order of business is reading these data. Pyodbc has an object
cursor
which represents a database cursor.
This cursor has a couple of functions we can consider:
fetchone()
: irrelevant as it returns only the first rowfetchall()
: not feasible as it will try to load all 81 million rows in memory. Well-suited for scenarios with way fewer rows.fetchmany()
: feasible as it allows passing an argument for how many rows to be returned at a time.
To use fetchmany()
, we must set a variable
size
. Then we can set a variable
iterations
which represents how many times we will
call fetchmany()
. Naturally, that would equal the
total amount of rows divided by the size. Here the iterations
will equal 162, meaning the loop will call fetchmany()
162 times, extracting 500,000 rows every time for a total of 81m rows. If the
count of rows is not even, then more complex logic would have to be designed to
handle the last set of rows.
Next, we must create an empty dataframe called result_df
.
The goal is to avoid handling the rows one by one, which would be inefficient. We
need to take advantage of Python lists and the append method of the dataframe object.
Therefore, inside the loop we will use another dataframe called
current_df
. At each iteration we will append
current_df
to result_df
:
size = 500000 iterations = total_rows // size result_df = pd.DataFrame(columns = ['rowid', 'userid', 'ip', 'ts']) with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password) as conn: with conn.cursor() as cursor: cursor.execute('SELECT RowId, IP, UserID, Timestamp FROM UserAccessLogs') for i in range(iterations): print(f'current iter: {i}') rows = cursor.fetchmany(size) current_df = pd.DataFrame.from_records(rows, columns =['rowid', 'userid', 'ip','ts']) result_df = result_df.append(current_df) print(result_df)
Here is what the code looks like:
Initially, it looks good. However, at iteration 50 (about a third of the data)
we get an out of memory exception, meaning the in-memory size of the target data
object result_df
grew beyond a certain limit:
Why did we get this error? The answer is that a pandas dataset required 5 to 10 times as much RAM as the size of the dataset. In our case we have about 4.73 GB of data:
My machine with 16GB of RAM is poorly suited to handle this in memory. A better configuration with 48 GB of free memory could do the trick, if you had one.
Attempt 2 with Dask
Dask is a flexible library for parallel computing in Python. It supports big
data collections such as dataframes that extend common interfaces like pandas to
larger-than-memory environments. Because Dask is designed to work with larger-than-memory
dataframes, you cannot use arbitrary text queries, only whole tables. So we can
use the method read_sql_table()
to read the whole
table to memory, regardless of its size:
driver= 'ODBC Driver 17 for SQL Server' # remove curly brackets connection_string = f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver={driver}' data = dd.read_sql_table('UserAccessLogs', connection_string, index_col='RowId')
You must provide the table name, the connection string, and an index column. This should typically be an indexed column in SQL (e.g., a numeric primary key). Dask will use it to define the partitioning of the table. Here is the result:
It took Dask a bit longer than a minute to partition the whole table. The output
on the screenshot above also displays the schema of the table (IP, UserId, Timestamp)
and the number of partitions (45). If needed the number of partitions can be set
manually by using the npartitions
parameter.
The next and last step would be to export the data
to a csv file. We can do this by using the to_csv
method which we call on the dask dataframe. It is important to note that one filename
per partition will be created. On my system this operation took about 63 minutes
to complete with the following script:
data.to_csv('export-*.csv')
The asterisk in the file is used as a placeholder for the sequential file number. The result is 45 files, equal to the number of partitions:
Additionally, with the single_file
parameter set
to True, we can output just one single file. While that can be handy, two issues
may occur:
- The excel row limit is 1,048,576 rows. Technically the csv format can hold more than that, however, the rows beyond that limit will be a subject to data loss and will not be accessible with the Excel app. To circumvent this problem, you can use notepad++ or similar editor that does not have a hard row limit.
- Any error occurring during the extraction process (e.g., network connectivity disruption or sql server resource limitation) will cause the whole operation to fail.
Considering these two complications, you can also consider using dask's
read_sql_query()
method. It will allow you to specify
a query, instead of by default reading the whole table. Thus, you can for example
partition your data manually by using the query itself.
Conclusion
Exporting data from SQL Server is straightforward and quick when there are not
a lot of data. An approach with a pandas dataframe and pyodbc fetchmany()
cursor method may work for about 1.5 GB of data on disk, considering the attempt
failed when a third of the data was loaded in memory. You also should keep in
mind the size of the data on disk may differ from the size in memory. However, when we enter
big data territory standard libraries and approaches may not always work. In such
a case, the dask library can turn your workstation into your own parallel distributed
environment that can handle hundreds of millions of rows. Finally, the performance
of these operations (read_sql_table
or
read_sql_query)
will depend on the database setup,
the network bandwidth and the machine running dask. Keep in mind that dask will
keep a read lock on the database table while writing to the csv file output.
Next Steps
About the author
This author pledges the content of this article is based on professional experience and not AI generated.
View all my tips
Article Last Updated: 2023-01-03