DBD::Teradata 2.2.0 provides support for IMPORT multiload (DELETE multiload will be incorporated in a future release). Using Multiload with DBD::Teradata requires some special considerations.
DBD::Teradata does not support recovery logging of utilities; i.e., you cannot PAUSE and then restart DBD::Teradata-based utility applications. If DBD::Teradata detects a failure during a multiload operation, it will automatically submit a RELEASE MLOAD request to attempt to clean up.
The attributes hash provided to tdat_UtilitySetup includes the following keys:
Attribute | Required ? | Description |
---|---|---|
Utility | Yes | set to 'MLOAD' |
SQL | Yes |
set to an arrayref of multistatement SQL requests. Each multistatment request is considered a separate Multiload job, as per multiple .DML statements in a Multiload script. The SQL specified includes the DML extension statements defined for the MLOAD utility, namely:
These statement should precede the actual INSERT/UPDATE/DELETE statements. If none of these statements are specified the default behavior is "MARK DUPLICATE ROWS; MARK MISSING ROWS;" |
Source | Yes |
can be set to either a subroutine reference, a file description,
or a connection handle to be used as the control session of a fastexport
job for Loopback (see below)
which will act as the source of the multiload data. A file description is specified as
where 'c' is the separator character. Use '|' for compatibility with the default Teradata VARTEXT file format. INDICDATA indicates a fastload formatted file with indicator bytes, and DATA indicates a fastload formatted file without indicator bytes.
If a subroutine reference is specified, the specified subroutine is
responsible for providing a multiload bitmask when binding
parameter data. The bitmask provides a means for an application
to implement the conditional APPLY supported in Multiload scripts
(i.e., "APPLY dmllabel WHERE condition").
The bitmask is supplied to the statement handle using the
In addition, when binding rows, the subroutine is responsible for keeping track of the current size of the output buffer. Assuming the final size of the record can be computed, an additional 14 bytes per record is added by the driver for Multiload control information, and the size in the buffer will be multiplied by the number of multiload jobs to which the record is applied. In summary, each record will add the following number of bytes to the output buffer:
Note that the subroutine supplies each record only once; DBD::Teradata will perform the replication of each record, based on the bitmask described above. |
SourceFields | Yes | a USING clause string that describes the source record format |
Report | No | a subroutine reference which is called with a status message as the Multiload progresses. |
LogTables | No | a hashref that maps the tables to be multiloaded to
their work, error, and duplicate tables. The tablenames are the keys,
and the values are arrayrefs with the names of the work/error/duplicate tables
(see examples below). Default is the name of each table specified in the
SQL requests, prefixed with WT_, ET_, and UV_ for the work,error, and duplicate
table, respectively (Tablenames longer than 27 characters are truncated).
Note that the worktables are automatically dropped when the multiload completes. Also, empty error and duplicate tables are also dropped. |
Checkpoint | No | the number of records processed between each checkpoint |
Sessions | Yes | the number of multiload sessions to use |
CheckpointCallback | No | a subroutine reference that is called each time a checkpoint is taken |
Context | No | a hashref that can contain anything the application desires; it will be passed to the Source subroutine reference each time records are to be collected |
MP | No | when set to a nonzero numeric value, or the string 'nothreads', causes multiple processes to be fork()'d, 1 per multiload session. When set to the string 'threads', causes multiple threads to be fork()'d, 1 per multiload session. This attribute may provide performance improvement on certain platforms (esp. multiprocessor platforms). |
Loopback | No | when set to a SQL SELECT statement, causes a fastexport job to be created from which data will be taken and supplied to the multiload job. Note that the MP attribute must be specified as well. Each fork' process gets both a fastexport and a multiload session, and data is transferred directly from the fastexport session to the multiload session. |
ErrorLimit | No | specifies the maximum number of errors to be permitted during the acquisition phase before the multiload is terminated. Default is 1,000,000 errors. |
ActivityCounts | No | specifies a hashref to receive the Insert/Update/Delete activity counts for each MLOAD'ed table. The hashref will be keyed by the table names, and each table's entry wili be a hashref with the keys "Inserts", "Updates", and "Deletes", and each value is the total rowcount applied to each activity for the given table (see example below) |
RequestSize | No | integer size in bytes; sets the maximum request buffer size to use for the utility sessions. Larger values permit more tuples to be sent to the DBMS in a single message. Default is 64256; maximum value is 1,048,000. Setting to a value outside that range, or for pre-V2R6.0 Teradata servers, will be silently ignored. |
Retry | No | either a scalar or arrayref; if scalar, indicates the number of seconds to wait between retrying to start the multiload in the event the prior attempt failed due to no remaining utility job slots available on the database. If an arrayref, the first element is the number of seconds between retries, and the second element is the maximum number of retry attempts. The scalar form will retry indefinitely. |
my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
{ RaiseError => 0, PrintError => 0, tdat_lsn => 0 });
my %act_counts = (); # to receive each table's activity counts
my $total = $ctldbh->tdat_UtilitySetup(
{
Utility => 'MLOAD',
Sessions => $sesscount,
SQL => [
'DO INSERT FOR MISSING UPDATE ROWS;
UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, :col12, :col13);',
"MARK DUPLICATE INSERT ROWS;
INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",
'MARK MISSING DELETE ROWS;
DELETE FROM alltypetst3 WHERE col1 = :col1;'
],
SourceFields => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
col5 varchar(100), col6 float, col7 decimal(2,1),
col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
col12 time, col13 timestamp(0))',
Checkpoint => 20000,
LogTables => {
alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
},
Report => \&report_cb,
Source => \&ml_get_data,
CheckpointCallback => \&mlcheckpoint,
Context => {
Count => \$count,
Runtime => \$mlstarted,
Base => [ 0, 1000000 ],
JobCount => 3
},
ActivityCounts => \%act_counts,
MP => 1,
Retry => [ 120, 3] # retry every 2 minutes up to 3 times
});
#
# display our activity counts
#
if ($total && ($total > 0)) {
print '
Table Inserts Updates Deletes
----------------------------------------
';
print $act_counts{$_}, "\t",
$act_counts{$_}->{Inserts}, "\t",
$act_counts{$_}->{Updatess}, "\t",
$act_counts{$_}->{Deletes}, "\n"
foreach (keys %act_counts);
}
print $ctldbh->errstr
unless ($total && ($total > 0));
sub mlcheckpoint {
my ($function, $rowcount, $ctxt) = @_;
my $flstarted = $ctxt->{Runtime};
$$flstarted = time,
print "$rowcount MLOAD sessions logged on.\n" and
return 1
if ($function eq 'INIT');
$$flstarted = time - $$flstarted,
print "$rowcount rows loaded.\n" and
return 1
if ($function eq 'FINISH');
print "Check point at $rowcount rows\n" and
return 1
if ($function eq 'CHECKPOINT');
1;
}
sub ml_get_data {
my ($function, $sth, $sessnum, $maxrows, $ctxt) = @_;
my ($ary, $rc, $rowcnt);
$sth->{tdat_raw} = $ctxt->{Mode}, return -1
if ($function eq 'INIT');
return 0 if (($function eq 'MOREDATA') && (${$ctxt->{Count}} >= 10000));
if ($function eq 'MOREDATA') {
$ary = collect_recs_each($ctxt->{Count},
($maxrows > 280) ? 280 : $maxrows, $ctxt->{JobCount}, $ctxt->{Base}->[$sessnum]);
print "Sending " . ($#{$$ary[0]}+1) . " rows for session $sessnum...\n";
foreach my $i (0..$#$ary) {
$rc = $sth->bind_param_array($i+1, $$ary[$i]);
}
return $#{$$ary[0]}+1;
}
print "Got IO Finish for $sessnum\n" and return -1
if ($function eq 'FINISH');
print "Got CHECKPOINT for $sessnum\n" and return -1
if ($function eq 'CHECKPOINT');
return 0;
}
sub report_cb {
my ($msg) = @_;
print $msg, "\n";
}
my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
{ RaiseError => 0, PrintError => 0, tdat_lsn => 0 });
my $total = $ctldbh->tdat_UtilitySetup(
{
Utility => 'MLOAD',
Sessions => $sesscount,
SQL => [
'DO INSERT FOR MISSING UPDATE ROWS;
UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, :col12, :col13);',
"MARK DUPLICATE INSERT ROWS;
INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",
'MARK MISSING DELETE ROWS;
DELETE FROM alltypetst3 WHERE col1 = :col1;'
],
SourceFields => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
col5 varchar(100), col6 float, col7 decimal(2,1),
col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
col12 time, col13 timestamp(0))',
Checkpoint => 20000,
LogTables => {
alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
},
Report => \&report_cb,
Source => 'INDICDATA rawdata.dat',
CheckpointCallback => \&mlcheckpoint,
ActivityCounts => \%act_counts,
MP => 'threads'
});
#
# display our activity counts
#
if ($total && ($total > 0)) {
print '
Table Inserts Updates Deletes
----------------------------------------
';
print $act_counts{$_}, "\t",
$act_counts{$_}->{Inserts}, "\t",
$act_counts{$_}->{Updatess}, "\t",
$act_counts{$_}->{Deletes}, "\n"
foreach (keys %act_counts);
}
print $ctldbh->errstr
unless ($total && ($total > 0));
my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
{ RaiseError => 0, PrintError => 0, tdat_lsn => 0 });
my $total = $ctldbh->tdat_UtilitySetup(
{
Utility => 'MLOAD',
Sessions => $sesscount,
SQL => [
'DO INSERT FOR MISSING UPDATE ROWS;
UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, :col12, :col13);',
"MARK DUPLICATE INSERT ROWS;
INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",
'MARK MISSING DELETE ROWS;
DELETE FROM alltypetst3 WHERE col1 = :col1;'
],
SourceFields => 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
col12 varchar(15), col13 varchar(19))',
Checkpoint => 20000,
LogTables => {
alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
},
Report => \&report_cb,
Source => "VARTEXT '|' vardata.txt",
CheckpointCallback => \&mlcheckpoint,
ActivityCounts => \%act_counts,
MP => 1
});
#
# display our activity counts
#
if ($total && ($total > 0)) {
print '
Table Inserts Updates Deletes
----------------------------------------
';
print $act_counts{$_}, "\t",
$act_counts{$_}->{Inserts}, "\t",
$act_counts{$_}->{Updatess}, "\t",
$act_counts{$_}->{Deletes}, "\n"
foreach (keys %act_counts);
}
print $ctldbh->errstr
unless ($total && ($total > 0));
my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
{ RaiseError => 0, PrintError => 0, tdat_lsn => 0 });
my $fedbh = DBI->connect("dbi:Teradata:otherdbc", $userid, $passwd,
{ PrintError => 0, RaiseError => 0, AutoCommit => 1, tdat_lsn => 0 });
my $total = $ctldbh->tdat_UtilitySetup(
{
Utility => 'MLOAD',
Sessions => $sesscount,
SQL => [
'DO INSERT FOR MISSING UPDATE ROWS;
UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, :col12, :col13);',
"MARK DUPLICATE INSERT ROWS;
INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
:col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",
'MARK MISSING DELETE ROWS;
DELETE FROM alltypetst3 WHERE col1 = :col1;'
],
SourceFields => 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
col12 varchar(15), col13 varchar(19))',
Checkpoint => 20000,
LogTables => {
alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
},
Loopback => 'SELECT * FROM sometable',
Source => $fedbh,
Report => \&report_cb,
CheckpointCallback => \&mlcheckpoint,
MP => 1
});
print $ctldbh->errstr
unless ($total && ($total > 0));