- Posted by redglue
- On July 17, 2017
- 0 Comments
- bigdata, hadoop, sqoop
We all know Sqoop for now (if not, you will learn a lit bit here), it is a well established and tool (on version 1.4.x, as 2.x is mostly a complete rewrite) to import data in bulk from relational databases to Hadoop and vice versa.
The most common scenario for Sqoop is to import data from the most common relational engines (Oracle, SQLServer, MySQL, etc – whatever supports JDBC) to insert into HDFS or directly, as a table in Apache Hive or HBase. Sqoops generates map-only MapReduce where each mapper connects to the database via JDBC driver and selects a portion of the data to assign to each mapper.
Also full table imports and “where” clauses are allowed.
Here is a simple example that writes directly to Hive table (on WASB Azure Blob Storage) called BAR_PART and automatically created a partitioned table by column DATE_LOCAL.
What is important here is the –split-by clause, and some considerations are needed here.
sqoop import --direct --connect "jdbc:oracle:thin:@192.168.1.1:1521:testDB" --password "foobar#2" --username "SYSTEM" --table "FOO.BAR_PART" --verbose --split-by "DATE_LOCAL" --target-dir "wasb://firstname.lastname@example.org/hive/warehouse" --hive-import --hive-table "FOO.BAR_PART" --hive-partition-key "DATE_LOCAL"
- Split-by as key to avoid data skew imports
Sqoop will try to use table primary key to split the workload across, by default, 4 mappers and ensure a good distribution of the data for each mapper using minimum and maximum values for the primary key column. If you think that split by primary key can provide some data skew and irregular data distribution across mappers you can use another column, as we do in previous example.
Please note that each mapper has a different JDBC connection to the database and will get only portion of the data if a “where” clause is specified, so it is always a good idea to use split-by on primary key OR partition key to avoid each mapper to scan the entire table.
If no primary key is defined, or table is not partitioned, that might be a good ideia to use only one mapper, that is controlled by num-mappers parameter.
- Don’t saturate your RDBMS with mappers
If you wrongly decide to use too much mappers, or you don’t have a candidate for split-by parameter (no PK or partition key) and you saturate your RDBMS with sessions that can run itself in parallel inside the RDBMS (Full Table Scans or Index Full Scans), your imports will be much slower than you want.
Start with default number of mappers (4), and increase if necessary and control the load on your database. Control your mappers number with num-mappers and split-by parameters. Keep an eye on RDBMS I/O and make sure that the balance between the data ingestion into Hadoop using Sqoop and the production users is the correct.
- Use correct and up-to-date connectors for your database
JDBC generic connect is mostly pointless if you want to ensure the best performance. As example Data Connector for Oracle (or parameter –direct) allow you a much better performance. As example, Oracle connector can place direct path reads and writes, parallel querys using Oracle temporary tables, define a minimum number of mappers, etc.
The advice is to read Sqoop documentation and use the correct connector for your database vendor.
- “Data gets updated” problem
Data gets updated many times and loading data with Sqoop is not a single event as data that you are importing can be updated (INSERTed, DELETed or UPDATed). What is important here, is that, HDFS is an “append-only filesystem” (exceptions made to HBase and Hive with ACID, but they are mostly tricks) and the options are pretty simple: replace the dataset, add data to dataset (partition for example) or merge datasets between old and new data.
If the data that you are loading is a small dataset, don’t think twice, replace and overwrite it.
If the data that you are loading is a big data set, a “incremental” load is recommended. This can be a little tricky as Sqoop needs to know what modification were done since the last incremental or full import.
On the example, –check-column attribute will identify the column that will “control” the rows that will be subject to incremental import. Use append mode to import a table where new rows are continually being added with increasing row id values.
sqoop job --create sell_id_job --import --connect \ jdbc:mysql://serverDB/sales \ --username sales --password sales \ --table sales --check-column id --incremental append
For tables that get updated or deleted and don’t have a ID that controls the “progress” you can use timestamp mode (called lastmodified) where the rows affected by incremental load are based of a timestamp value. If using a job like used in example the –last-value of any mode is saved and it is the recommended way to handle incremental loads.
- The “Network bottleneck”
This one is easy, you have a limited bandwidth between your Hadoop cluster and your relational database. That can be 120Mpbs or less (remember that some Hadoop cluster live only in the cloud and that is a very common trend). If you are nearby the network bandwidth limit and increase more mappers, the only thing that will increase is your database load and not the speed of the import.
- Other options
Sqoop is only a way to move bulk data that resides into relational databases into Hadoop ecosystem. A lot of commercial connectors from database vendors as well as SaaS inside most popular clouds (Azure Data Factory is an example).
Remember that Sqoop is free, open-source and you should contribute (review and submit patches and bugs).