mardi 24 février 2015

Spring Batch ItemReader using PostgreSQL function cursor error



I need to have a Spring Batch ItemReader use a PostgreSQL function as the data source, but I have not been successful (the PostgreSQL database is within a Greenplum appliance). I have created an example of an ItemReader calling a PostgreSQL function that returns a cursor, but the ItemReader fails stating that the cursor does not exist. The following are all of the components of my setup:


-Database table and data:



CREATE TABLE test_user
(
test_user_sys_id numeric NOT NULL,
ssn character varying(9) NOT NULL,
create_user_id character varying(30) NOT NULL,
create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);

INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (3,'333333333','DataAdmin');


-Database function:



CREATE or REPLACE FUNCTION get_user_func_no_arg(
p_id_min NUMERIC -- Minimum ID value
)
RETURNS REFCURSOR
AS

$BODY$
DECLARE
resultCursor REFCURSOR := 'resultCursor';
BEGIN
OPEN resultCursor FOR

SELECT test_user_sys_id, ssn
FROM test_user
WHERE test_user_sys_id > p_id_min;

RETURN resultCursor;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;


-localLaunchContext.xml (envirnoment-specific info):



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:xsi="http://ift.tt/ra1lAU"
xsi:schemaLocation="
http://ift.tt/GArMu6 http://ift.tt/GAf8ZW">

<!-- Import the Spring Batch config files -->
<import resource="springBatchConfig.xml" />

<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="THE URL"/>
<property name="username" value="THE USER"/>
<property name="password" value="THE PASSWORD"/>
</bean>

<!-- Define a resourceless transaction manager for the in-memory job repository -->
<bean id="repositoryTransactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<!-- Define a transaction manager for PostgreSQL to hopefully make the cursor work -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="postgresql_dataSource"/>
</bean>

<!-- Define in-memory job repository -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="repositoryTransactionManager"/>
</bean>

<!-- Define the synchronous job launcher -->
<bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>


-springBatchConfig.xml (Job info):



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:batch="http://ift.tt/1fayA4Z"
xmlns:xsi="http://ift.tt/ra1lAU"
xmlns:util="http://ift.tt/OGfeTW"
xsi:schemaLocation="http://ift.tt/GArMu6
http://ift.tt/QEDs1e
http://ift.tt/1fayA4Z
http://ift.tt/1lPme4y
http://ift.tt/OGfeTW
http://ift.tt/1bFKsJT">

<!-- ================================================== -->
<!-- Components for TestUser Job -->
<!-- ================================================== -->

<!-- Test User Stored Procedure ItemReader -->
<bean id="testUserItemReader"
class="org.springframework.batch.item.database.StoredProcedureItemReader"
scope="step">
<property name="dataSource" ref="postgresql_dataSource" />
<property name="procedureName" value="get_user_func_no_arg" />
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="p_id_min " />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.NUMERIC" />
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="resultCursor" />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.OTHER" />
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="2" />
<property name="rowMapper">
<bean class="dao.mapper.TestUserRowMapper" />
</property>
<property name="preparedStatementSetter" ref="preparedStatementSetter" />
</bean>

<!-- ItemProcessor is not needed, since the stored procedure provides the results -->

<!-- TestUser ItemWriter -->
<bean id="testUserItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:output.txt" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>

<!-- TestUser Job definition -->
<batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
<batch:step id="TestUser_step1">
<batch:tasklet>
<batch:transaction-attributes isolation="READ_COMMITTED" propagation="MANDATORY" timeout="200"/>
<batch:chunk reader="testUserItemReader"
writer="testUserItemWriter" commit-interval="2" />
</batch:tasklet>
</batch:step>
</batch:job>

<!-- ================================================== -->
<!-- Common Beans that are used in multiple scenarios -->
<!-- ================================================== -->

<!-- Increments the Job ID -->
<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

<!-- Prepared statement setter to provide minimum user ID input to the PostgreSQl function. -->
<bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
scope="step">
<property name="minId">
<value>#{jobParameters['minId']}</value>
</property>
</bean>
</beans>


-TestUser.java (ItemReader data object):



package domain;

// Data object to support a user
public class TestUser {

private int id;
private String ssn;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getSsn() {
return ssn;
}
public void setSsn(String ssn) {
this.ssn = ssn;
}

@Override
public String toString() {
return "TestUser [id=" + id + ", ssn=" + ssn + "]";
}
}


-TestUserPreparedStatementSetter.java (Sets input parameters to PostgreSQL function):



package dao.setter;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

// Spring Batch PreparedStatementSetter to set parameters for stored procedures
public class TestUserPreparedStatementSetter implements PreparedStatementSetter {

private int minId;

// Setters
public void setMinId(int minId) {
this.minId = minId;
}

// Set the parameters for the stored procedure
public void setValues(PreparedStatement ps) throws SQLException {
CallableStatement cs = (CallableStatement) ps;
cs.setInt(1, this.minId);

cs.registerOutParameter(2, java.sql.Types.OTHER);
}
}


-TestUserRowMapper.java (Maps ItemReader cursor rows to TestUser data objects):



package dao.mapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import domain.TestUser;

//Spring RowMapper to support stored procedure results
public class TestUserRowMapper implements RowMapper<TestUser>{

// Map the results to the DAO
public TestUser mapRow(ResultSet resultSet, int rowNum) throws SQLException
{
TestUser resultData = new TestUser();
resultData.setId(resultSet.getInt(1));
resultData.setSsn(resultSet.getString(2));

return resultData;
}
}


-Invocation command:



java -classpath ".;lib\*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1


When I run this, I get the following error:



Caused by: org.postgresql.util.PSQLException: ERROR: cursor "resultCursor" does not exist
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:561)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:405)
at org.postgresql.jdbc2.AbstractJdbc2Connection.execSQLQuery(AbstractJdbc2Connection.java:363)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.internalGetObject(AbstractJdbc2ResultSet.java:207)
at org.postgresql.jdbc3.AbstractJdbc3ResultSet.internalGetObject(AbstractJdbc3ResultSet.java:36)
at org.postgresql.jdbc4.AbstractJdbc4ResultSet.internalGetObject(AbstractJdbc4ResultSet.java:300)
at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getObject(AbstractJdbc2ResultSet.java:2704)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:456)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:412)
at org.springframework.batch.item.database.StoredProcedureItemReader.openCursor(StoredProcedureItemReader.java:205)
... 29 more


I've gone back and forth on trying to have the REFCURSOR specified as an output parameter to the function and many other things, but without success. This is occuring using JDK 1.7, PostgreSQL JDBC driver JAR postgresql-9.3-1102.jdbc4, and PostgreSQL 8.2.15 (under Greenplum 4.2.8.1 build 2).


Any suggestions would be welcome. Thanks in advance.




Aucun commentaire:

Enregistrer un commentaire