From fca5896923c206c82b4c0628e6de24392e626cf5 Mon Sep 17 00:00:00 2001 From: Tomas Kacur Date: Thu, 3 Oct 2019 22:36:43 +0200 Subject: [PATCH 1/2] distribute query path to every row --- src/keboola/facebook/api/request.clj | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/keboola/facebook/api/request.clj b/src/keboola/facebook/api/request.clj index 6db9997..194c874 100644 --- a/src/keboola/facebook/api/request.clj +++ b/src/keboola/facebook/api/request.clj @@ -143,7 +143,7 @@ (defn page-and-collect "collect data from response and make another paging requests if needed. Returns lazy sequence of flattened data resulting from processing the whole query" - [{:keys [ex-account-id parent-id fb-graph-node table-name body-data response] :as init-params}] + [{:keys [ex-account-id parent-id fb-graph-node table-name path body-data response] :as init-params}] ((fn step [params this-object-data rest-objects top-node] (if (and (empty? rest-objects) (empty? this-object-data)) nil @@ -158,6 +158,7 @@ :parent-id (:parent-id next-object) :fb-graph-node (:fb-graph-node next-object) :table-name (:name next-object) + :path path :response (:data next-object) :body-data (:data (:data next-object)))] (lazy-seq (cons new-rows (step new-params (:body-data new-params) (rest all-objects) top-node)))))) init-params body-data [] fb-graph-node)) @@ -184,6 +185,7 @@ :parent-id (name (first %)) :fb-graph-node "page" :table-name "page" + :path path :body-data [(if (not-empty path) {sanitized-path (second %)} (second %))] :response response-body}) @@ -195,6 +197,7 @@ :parent-id "" :fb-graph-node "page" :table-name "page" + :path path :body-data [(if (not-empty path) {sanitized-path response-body} response-body)] :response (if (not-empty path) {sanitized-path response-body} response-body)})))) From 300cc778ed419b3ee8703804f13bb14239cc0efd Mon Sep 17 00:00:00 2001 From: Tomas Kacur Date: Thu, 3 Oct 2019 22:37:22 +0200 Subject: [PATCH 2/2] add ad_id to pk if path is empty --- src/keboola/facebook/extractor/output.clj | 20 ++++++++++------ .../facebook/extractor/output_test.clj | 23 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/keboola/facebook/extractor/output.clj b/src/keboola/facebook/extractor/output.clj index 0f73c05..7aee0e8 100644 --- a/src/keboola/facebook/extractor/output.clj +++ b/src/keboola/facebook/extractor/output.clj @@ -31,11 +31,16 @@ {"insights" ["age" "country" "dma" "gender" "frequency_value" "hourly_stats_aggregated_by_advertiser_time_zone" "hourly_stats_aggregated_by_audience_time_zone" "impression_device" "place_page_id" "placement" "publisher_platform" "platform_position" "device_platform" "product_id" "region"] "ratings" ["reviewer_id"]}) -(defn get-primary-key [table-columns table-name] - (let [basic-pk ["parent_id"] +(def ENDPOINT-SPECIFIC-PK-MAP + {"" ["ad_id"]}) + +(defn get-primary-key [table-columns table-name context] + (let [endpoint (:path context) + basic-pk ["parent_id"] all-tables-pk ["id" "key1" "key2" "end_time" "account_id" "campaign_id" "date_start" "date_stop" "ads_action_name" "action_type" "action_reaction"] table-only-pk (TABLES-SPECIFIC-PK-MAP table-name #{}) - extended-pk (concat all-tables-pk table-only-pk)] + endpoint-only-pk (ENDPOINT-SPECIFIC-PK-MAP endpoint #{}) + extended-pk (concat all-tables-pk table-only-pk endpoint-only-pk)] (concat basic-pk (filter (fn [column] (some #(= % (keyword column)) table-columns)) @@ -53,9 +58,9 @@ (def columns-map (atom {})) (defn reset-columns-map [] (reset! columns-map {})) -(defn write-manifest [manifest-path columns is-write? table-name] +(defn write-manifest [manifest-path columns is-write? table-name context] (if is-write? - (let [manifest {:incremental true :primary_key (get-primary-key columns table-name) :columns columns}] + (let [manifest {:incremental true :primary_key (get-primary-key columns table-name context) :columns columns}] (if (not (contains? @columns-map manifest-path)) (do (runtime/save-manifest manifest-path manifest) @@ -83,10 +88,11 @@ (let [first-write? (:first-write? memo) header (if first-write? (prepare-header (:buffer memo) manifest-path) - (:header memo))] + (:header memo)) + context (-> memo :buffer first :keboola)] (if header (do - (write-manifest manifest-path header first-write? table-name) + (write-manifest manifest-path header first-write? table-name context) (csv/write-to-file csv-file header (:buffer memo) false) (if (= (mod (:cnt memo) (* chan-buffer-size 20)) 0) (runtime/log-strings "Written" (:cnt memo) "rows to" table-name)) diff --git a/test/keboola/facebook/extractor/output_test.clj b/test/keboola/facebook/extractor/output_test.clj index 750bd48..75c2b5d 100644 --- a/test/keboola/facebook/extractor/output_test.clj +++ b/test/keboola/facebook/extractor/output_test.clj @@ -8,13 +8,16 @@ (is (= (set value) (set complete-expected))))) (deftest test-primary-key - (test-pk (sut/get-primary-key ["id"] "") []) - (test-pk (sut/get-primary-key [:id] "") ["id"]) - (test-pk (sut/get-primary-key ["foo"] "") []) - (test-pk (sut/get-primary-key [:foo] "") []) - (test-pk (sut/get-primary-key [:id :key] "") ["id"]) - (test-pk (sut/get-primary-key [:id :key1 :foo :key2] "insights") ["id" "key1" "key2"]) - (test-pk (sut/get-primary-key [:id :placement :device_platform :foo :key2 :age] "insights") ["id" "placement" "key2" "age" "device_platform"]) - (test-pk (sut/get-primary-key [:id :placement :foo :key2 :age :region :country :gender] "asdasfoo") ["id" "key2"]) - (test-pk (sut/get-primary-key [:id :key1 :foo :key2] "") ["id" "key1" "key2"]) - (test-pk (sut/get-primary-key [:id :key1 :foo :key2 :reviewer_id] "ratings") ["id" "key1" "key2" "reviewer_id"])) + (test-pk (sut/get-primary-key ["id"] "" {}) []) + (test-pk (sut/get-primary-key [:id] "" {}) ["id"]) + (test-pk (sut/get-primary-key ["foo"] "" {}) []) + (test-pk (sut/get-primary-key [:foo] "" {}) []) + (test-pk (sut/get-primary-key [:id :key] "" {}) ["id"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2] "insights" {}) ["id" "key1" "key2"]) + (test-pk (sut/get-primary-key [:id :placement :device_platform :foo :key2 :age] "insights" {}) ["id" "placement" "key2" "age" "device_platform"]) + (test-pk (sut/get-primary-key [:id :placement :foo :key2 :age :region :country :gender] "asdasfoo" {}) ["id" "key2"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2] "" {}) ["id" "key1" "key2"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2] "" {:path ""}) ["id" "key1" "key2"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2 :ad_id] "" {:path ""}) ["id" "key1" "key2" "ad_id"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2 :ad_id] "" {:path "ads"}) ["id" "key1" "key2"]) + (test-pk (sut/get-primary-key [:id :key1 :foo :key2 :reviewer_id] "ratings" {}) ["id" "key1" "key2" "reviewer_id"]))