-
Notifications
You must be signed in to change notification settings - Fork 0
/
PATSTAT.py
163 lines (144 loc) · 5.72 KB
/
PATSTAT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from configparser import ConfigParser
from pathlib import Path
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Window
def gen_PT_metadata(spark):
# Define directories
#
cf = ConfigParser()
cf.read("config.cf")
dir_db = Path(cf.get("data", "patstat"))
#################################################
#### Load info
#################################################
pt = spark.read.parquet(dir_db.joinpath("patstat_appln.parquet").as_posix())
# Key bibliographical data elements relevant to identify patent publications
pt_pub = spark.read.parquet(dir_db.joinpath("tls211.parquet").as_posix()).select(
"appln_id", "pat_publn_id", "publn_kind"
)
# Links between publications, applications and non-patent literature documents with regards to citations.
pt_cit = spark.read.parquet(dir_db.joinpath("tls212.parquet").as_posix()).select(
"pat_publn_id", "cited_pat_publn_id", "cited_appln_id", "cited_npl_publn_id"
)
cit_appln = pt_cit.select("pat_publn_id", "cited_appln_id").where(
"cited_appln_id>0"
)
cit_publn = pt_cit.select("pat_publn_id", "cited_pat_publn_id").where(
"cited_pat_publn_id>0"
)
#################################################
#### Citation info
#################################################
# First, join appln_ids with docdb_family_id in publications
pt_fam = pt.select(
F.col("appln_id").alias("appln_id_f"), "docdb_family_id"
) # appln_id_f <-> docdb_family_id
pub_fam = pt_pub.join(pt_fam, pt_pub.appln_id == pt_fam.appln_id_f, "left").drop(
"appln_id_f"
) # pat_publn_id <-> appln_id <-> docdb_family_id
# Then, convert citations through appln_ids to citations using docdb_family_id
pbf_src = pub_fam.select(*(F.col(el).alias(el + "_src") for el in pub_fam.columns))
pbf_dst = pub_fam.select(*(F.col(el).alias(el + "_dst") for el in pub_fam.columns))
# These are the publiactions citing other publications (include family identifiers)
cit_publn_fam = (
cit_publn.join(
pbf_src, cit_publn.pat_publn_id == pbf_src.pat_publn_id_src, "left"
)
.join(pbf_dst, cit_publn.cited_pat_publn_id == pbf_dst.pat_publn_id_dst, "left")
.drop("pat_publn_id", "cited_pat_publn_id")
.select(
"appln_id_src",
"pat_publn_id_src",
"publn_kind_src",
"docdb_family_id_src",
"appln_id_dst",
"pat_publn_id_dst",
"publn_kind_dst",
"docdb_family_id_dst",
)
.withColumn(
"autoCit", F.col("docdb_family_id_src") == F.col("docdb_family_id_dst")
)
)
# These are the publications citing appln_id (include family identifiers)
cit_appln_fam = (
cit_appln.alias("df")
.join(
pbf_src.alias("df1"),
F.col("df.pat_publn_id") == F.col("df1.pat_publn_id_src"),
"left",
)
.join(
pt_fam.select(
"appln_id_f", F.col("docdb_family_id").alias("docdb_family_id_dst")
).alias("df2"),
F.col("df.cited_appln_id") == F.col("df2.appln_id_f"),
"left",
)
.drop("pat_publn_id_src", "appln_id_f")
.select(
"appln_id_src",
"pat_publn_id",
"publn_kind_src",
"docdb_family_id_src",
"cited_appln_id",
"docdb_family_id_dst",
)
.withColumn(
"autoCit", F.col("docdb_family_id_src") == F.col("docdb_family_id_dst")
)
)
# cit_fam.write.parquet(
# "/export/usuarios01/joseantem/Proyectos/out/fam_citations.parquet",
# mode="overwrite",
# )
# Concat cit_publn_fam and cit_appln_fam
full_cit = cit_publn_fam.select(
"docdb_family_id_src", "docdb_family_id_dst", "autoCit"
).unionByName(
cit_appln_fam.select("docdb_family_id_src", "docdb_family_id_dst", "autoCit")
)
# Get unique total citations
window = Window.partitionBy("docdb_family_id_src", "docdb_family_id_dst").orderBy(
"docdb_family_id_src", "docdb_family_id_dst"
)
full_cit_un = (
full_cit.withColumn("rank", F.row_number().over(window))
.filter(F.col("rank") == 1)
.drop("rank")
)
# Auto citation
auto_cit = full_cit_un.where("autoCit")
#################################################
#### Get counts
#################################################
tot_cit_appln = cit_appln_fam.count()
tot_cit_publn = cit_publn_fam.count()
tot_full_cit = full_cit.count()
full_cit_unique = full_cit_un.count()
auto_cit = auto_cit.count()
print("Citations by doc-db family")
print(f"Number of citations (applications): {tot_cit_appln}")
print(f"Number of citations (publications): {tot_cit_publn}")
print(f"Number of combined citations: {tot_full_cit}")
print(f"Number of unique citations: {full_cit_unique} ({full_cit_unique/tot_full_cit*100:.3f}%)")
print(f"Number of auto citations: {auto_cit} ({auto_cit/auto_cit*100:.3f}%)")
#################################################
#### Output data
#################################################
columns = ["Num_cit", "Num_cit_wo_self"]
row = [full_cit_unique, full_cit_unique-auto_cit]
data = [row]
df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show(truncate=False)
df.write.parquet(
dir_db.joinpath("metadata.parquet").as_posix(),
mode="overwrite",
)
if __name__ == "__main__":
# Create session
spark = SparkSession.builder.appName("WP3pipeline").getOrCreate()
sc = spark.sparkContext
print(sc.version)
gen_PT_metadata(spark)