Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to read delete/update data #74

Open
Wangddawei opened this issue Mar 4, 2021 · 12 comments
Open

Unable to read delete/update data #74

Wangddawei opened this issue Mar 4, 2021 · 12 comments

Comments

@Wangddawei
Copy link

Kafka-connect-SAP synchronize data from HANA to Kafka, but the test result is only read data after INSERT operation, DELETE and UPDATE are not able to read data, Kafka can not read data. I see the code you wrote, there is a Query mode in it, as if it can only read the data in HANA, other operations can not get the data, is that right? Or am I missing something?

@elakito
Copy link
Collaborator

elakito commented Mar 4, 2021

@Wangddawei you are right. The hana source connector can only handle only the bulk (the entire table at once) or the incrementing (the newly inserted records) fetching. In other words, there is no CDC (i.e., fetching the insert, update, delete events) version of the source connector available at the moment due to some questions that need to be clarified. In contrast, the hana sink connector can handle both update and delete.

We will keep this ticket open to track the status of the source connector's update/delete handling.

@Wangddawei
Copy link
Author

Can HANA's data flow to Kafka be configured only in source files? I didn't configure the sink file, I think the sink file is just configuring Kafka -- > HANA. One-way synchronization

@elakito
Copy link
Collaborator

elakito commented Mar 8, 2021

@Wangddawei Yes. The source and sink connectors are independent and they can be configured independently and combined with other source or sink connectors.

@Wangddawei
Copy link
Author

thank you very much!

@garethdebcor
Copy link

We have the connector setup, however incrementing mode is not allowing us to read new records. Example - the MARA table which has all the material / SKU records. When a new record is created, how do you setup the incrementing column to pickup the new record? The incrementing column needs to be a numeric/integer based column, but the key field on MARA is MATNR, which is CHAR40. Is there any trick to getting a table to download the deltas? Or do I need to create a CDS view which auto creates a column which indexes?

@elakito
Copy link
Collaborator

elakito commented Jun 11, 2021

@garethdebcor The incrementing column can be of string based types. The only requirement is that your source table is just updated only by insert.

{topic}.incrementing.column.name - ... The type of the column can be numeric types such as INTEGER, FLOAT, DECIMAL, datetime types such as DATE, TIME, TIMESTAMP, and character types VARCHAR, NVARCHAR containing alpha-numeric characters. ...

Do you see some error when you specify this column as the incrementing column in your connector configuration?

@garethdebcor
Copy link

We actually don't see any error. We just see the initial load happen (14k records). Then we add a material and the logs show that the connector is working - but nothing happens (no new records show up).

Thanks, Gareth

@garethdebcor
Copy link

Here is our config file for reference. Do you see anything out of the ordinary? This is a small custom table with only 9 entries.

name=test-topic-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=ztest_last
connection.url=jdbc:sap://sap.xxxxx..xxxx.xxxx.xxxx
connection.user=USER
connection.password=PASSWORD
mode=incrementing
ztest_last.incrementing.column.name=CARRIER_SCAC
ztest_last.table.name="SAPABAP1"."ZCVY_LAST_MILE"
numeric.mapping=best_fit

Here is a picture of the table

Screen Shot 2021-06-11 at 3 12 43 PM

@garethdebcor
Copy link

My one pet peeve is when someone posts a problem but never posts the solution. So, we ended up solving this w/a slight config change. Makes logical sense now after the fact. We swapped the lines of the declarations of the incrementing column and the table name - and the new records now show up. Makes sense from a programming standpoint as well - declare the structure and then declare the field of the structure.

Now we move on to the delete / update task.

(working config)
name=test-topic-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=ztest_last
connection.url=jdbc:sap://sap.xxxxx..xxxx.xxxx.xxxx
connection.user=USER
connection.password=PASSWORD
mode=incrementing
ztest_last.table.name="SAPABAP1"."ZCVY_LAST_MILE"
ztest_last.incrementing.column.name=CARRIER_SCAC
numeric.mapping=best_fit

@elakito
Copy link
Collaborator

elakito commented Jun 14, 2021

@garethdebcor Thanks for investigating on the problem. It is strange that you observed the order of these two lines changed the behavior because those properties are read sequentially into a map but accessed in the order used.

@Mahi4089
Copy link

Mahi4089 commented Dec 13, 2021

We have the HANA connector setup and the initial or full load is successful, however incrementing mode is not allowing us to read new records. Example - the ZCVY_LAST_MILE , When a new record is created, the incrementing column doesn't not pickup the new record? it just polls and says not increment column updates and no error logs.

Please find the below properties that being currently used.

name=dev-topic-source_zcvy_1
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=dev_zcvy_1
connection.url=jdbc:sap://XXXXXXXXXXX
connection.user=XXXXXX
connection.password=XXXXXXXX
mode=incrementing
dev_zcvy_1.table.name="SAPABAP1"."ZCVY_LAST_MILE"
dev_zcvy_1.incrementing.column.name=CARRIER_SCAC
numeric.mapping=best_fit

@elakito
Copy link
Collaborator

elakito commented Dec 17, 2021

@Mahi4089 That sounds like there is some data in the initial load that has a higher incrementing value than the newly added data.
Maybe you could run an SQL query:
SELECT * from SAPABAP1.ZCVY_LAST_MILE WHERE CARRIER_SCAC > 'abcdef';
where abcdef is the current highest incrementing value.
and insert some data that have a higher incrementing value than 'abcdef' and run again
SELECT * from SAPABAP1.ZCVY_LAST_MILE WHERE CARRIER_SCAC > 'abcdef';
This should return the data that you just inserted. If this doesn't return any data, there are some data in the table that have a higher incrementing value than 'abcdef'.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants