まひろ量子のハックログ

プログラミングや機械学習などの知識を記録・共有します

正規表現をFSTに変換してOpenFSTで扱えるようにする

FSTとは

Finite-state Transducerの略。オートマトンの一種で、複数のノードとアークから成る。

入力系列に対し、その入力系列が受領可能か受領不可能かを返す。受領可能だった場合は、同時に出力系列も返す。 出力系列の候補が2通りあった場合、重みの大小によって出力の優先度が決まる。このような重みが存在するFSTをWFST(Weighted Finite-state Transducer)という。

WFSTを扱うときに便利なOSSがある。OpenFSTというツールだ。

以下の記事でも紹介したので詳しくはコチラを御覧いただきたい。

www.mahirokazuko.com

正規表現をFST化する

FSTの教科書を読むと、「正規表現も実はFSTで表せるよー」とよく書いてある。しかし、正規表現をFSTに変換してくれるツールを探してみても、意外とほとんど見つからなかった。唯一見つかったのがコチラのコード。python版とc++版がある。

GitHub - siddharthasahu/automata-from-regex: A python program to build nfa, dfa and minimised DFA from given regular expression. Uses Tkinter for GUI and GraphViz for graphs.

ただし、このコードはアルファベットをもちいた正規表現にのみ対応しており、そのままでは日本語を扱うことができない。そこで、一部を改造して日本語に対応できるようにした。改造したのはAutomataTheory.pyという名前のソースだ。ライセンスはGPL lv3とする。 (ついでにpython3で動くようにした。)

AutomataTheory.py (LICENSE: GNU GPL lv3)

from os import popen
import time

class Automata:
    """class to represent an Automata"""

    def __init__(self, language = set(['0', '1'])):
        self.states = set()
        self.startstate = None
        self.finalstates = []
        self.transitions = dict()
        self.language = language

    @staticmethod
    def epsilon():
        return ":e:"

    def setstartstate(self, state):
        self.startstate = state
        self.states.add(state)

    def addfinalstates(self, state):
        if isinstance(state, int):
            state = [state]
        for s in state:
            if s not in self.finalstates:
                self.finalstates.append(s)

    def addtransition(self, fromstate, tostate, inp):
        if isinstance(inp, str):
            inp = set([inp])
        self.states.add(fromstate)
        self.states.add(tostate)
        if fromstate in self.transitions:
            if tostate in self.transitions[fromstate]:
                self.transitions[fromstate][tostate] = self.transitions[fromstate][tostate].union(inp)
            else:
                self.transitions[fromstate][tostate] = inp
        else:
            self.transitions[fromstate] = {tostate : inp}

    def addtransition_dict(self, transitions):
        for fromstate, tostates in transitions.items():
            for state in tostates:
                self.addtransition(fromstate, state, tostates[state])

    def gettransitions(self, state, key):
        if isinstance(state, int):
            state = [state]
        trstates = set()
        for st in state:
            if st in self.transitions:
                for tns in self.transitions[st]:
                    if key in self.transitions[st][tns]:
                        trstates.add(tns)
        return trstates

    def getEClose(self, findstate):
        allstates = set()
        states = set([findstate])
        while len(states)!= 0:
            state = states.pop()
            allstates.add(state)
            if state in self.transitions:
                for tns in self.transitions[state]:
                    if Automata.epsilon() in self.transitions[state][tns] and tns not in allstates:
                        states.add(tns)
        return allstates

    def display(self):
        print ("states:", self.states)
        print ("start state: ", self.startstate)
        print ("final states:", self.finalstates)
        print ("transitions:")
        for fromstate, tostates in self.transitions.items():
            for state in tostates:
                for char in tostates[state]:
                    print ("#", fromstate, state, char, char, "1")
            print ()
        for finalstate in self.finalstates:
            print ('#', finalstate)

    def getPrintText(self):
        text = "language: {" + ", ".join(self.language) + "}\n"
        text += "states: {" + ", ".join(map(str,self.states)) + "}\n"
        text += "start state: " + str(self.startstate) + "\n"
        text += "final states: {" + ", ".join(map(str,self.finalstates)) + "}\n"
        text += "transitions:\n"
        linecount = 5
        for fromstate, tostates in self.transitions.items():
            for state in tostates:
                for char in tostates[state]:
                    text += "    " + str(fromstate) + " -> " + str(state) + " on '" + char + "'\n"
                    linecount +=1
        return [text, linecount]

    def newBuildFromNumber(self, startnum):
        translations = {}
        for i in list(self.states):
            translations[i] = startnum
            startnum += 1
        rebuild = Automata(self.language)
        rebuild.setstartstate(translations[self.startstate])
        rebuild.addfinalstates(translations[self.finalstates[0]])
        for fromstate, tostates in self.transitions.items():
            for state in tostates:
                rebuild.addtransition(translations[fromstate], translations[state], tostates[state])
        return [rebuild, startnum]

    def newBuildFromEquivalentStates(self, equivalent, pos):
        rebuild = Automata(self.language)
        for fromstate, tostates in self.transitions.items():
            for state in tostates:
                rebuild.addtransition(pos[fromstate], pos[state], tostates[state])
        rebuild.setstartstate(pos[self.startstate])
        for s in self.finalstates:
            rebuild.addfinalstates(pos[s])
        return rebuild

    def getDotFile(self):
        dotFile = "digraph DFA {\nrankdir=LR\n"
        if len(self.states) != 0:
            dotFile += "root=s1\nstart [shape=point]\nstart->s%d\n" % self.startstate
            for state in self.states:
                if state in self.finalstates:
                    dotFile += "s%d [shape=doublecircle]\n" % state
                else:
                    dotFile += "s%d [shape=circle]\n" % state
            for fromstate, tostates in self.transitions.items():
                for state in tostates:
                    for char in tostates[state]:
                        dotFile += 's%d->s%d [label="%s"]\n' % (fromstate, state, char)
        dotFile += "}"
        return dotFile

class BuildAutomata:
    """class for building e-nfa basic structures"""

    @staticmethod
    def basicstruct(inp):
        state1 = 1
        state2 = 2
        basic = Automata()
        basic.setstartstate(state1)
        basic.addfinalstates(state2)
        basic.addtransition(1, 2, inp)
        return basic

    @staticmethod
    def plusstruct(a, b):
        [a, m1] = a.newBuildFromNumber(2)
        [b, m2] = b.newBuildFromNumber(m1)
        state1 = 1
        state2 = m2
        plus = Automata()
        plus.setstartstate(state1)
        plus.addfinalstates(state2)
        plus.addtransition(plus.startstate, a.startstate, Automata.epsilon())
        plus.addtransition(plus.startstate, b.startstate, Automata.epsilon())
        plus.addtransition(a.finalstates[0], plus.finalstates[0], Automata.epsilon())
        plus.addtransition(b.finalstates[0], plus.finalstates[0], Automata.epsilon())
        plus.addtransition_dict(a.transitions)
        plus.addtransition_dict(b.transitions)
        return plus

    @staticmethod
    def dotstruct(a, b):
        [a, m1] = a.newBuildFromNumber(1)
        [b, m2] = b.newBuildFromNumber(m1)
        state1 = 1
        state2 = m2-1
        dot = Automata()
        dot.setstartstate(state1)
        dot.addfinalstates(state2)
        dot.addtransition(a.finalstates[0], b.startstate, Automata.epsilon())
        dot.addtransition_dict(a.transitions)
        dot.addtransition_dict(b.transitions)
        return dot

    @staticmethod
    def starstruct(a):
        [a, m1] = a.newBuildFromNumber(2)
        state1 = 1
        state2 = m1
        star = Automata()
        star.setstartstate(state1)
        star.addfinalstates(state2)
        star.addtransition(star.startstate, a.startstate, Automata.epsilon())
        star.addtransition(star.startstate, star.finalstates[0], Automata.epsilon())
        star.addtransition(a.finalstates[0], star.finalstates[0], Automata.epsilon())
        star.addtransition(a.finalstates[0], a.startstate, Automata.epsilon())
        star.addtransition_dict(a.transitions)
        return star


class DFAfromNFA:
    """class for building dfa from e-nfa and minimise it"""

    def __init__(self, nfa):
        self.buildDFA(nfa)
        self.minimise()

    def getDFA(self):
        return self.dfa

    def getMinimisedDFA(self):
        return self.minDFA

    def displayDFA(self):
        self.dfa.display()

    def displayMinimisedDFA(self):
        self.minDFA.display()

    def buildDFA(self, nfa):
        allstates = dict()
        eclose = dict()
        count = 1
        state1 = nfa.getEClose(nfa.startstate)
        eclose[nfa.startstate] = state1
        dfa = Automata(nfa.language)
        dfa.setstartstate(count)
        states = [[state1, count]]
        allstates[count] = state1
        count +=  1
        while len(states) != 0:
            [state, fromindex] = states.pop()
            for char in dfa.language:
                trstates = nfa.gettransitions(state, char)
                for s in list(trstates)[:]:
                    if s not in eclose:
                        eclose[s] = nfa.getEClose(s)
                    trstates = trstates.union(eclose[s])
                if len(trstates) != 0:
                    if trstates not in allstates.values():
                        states.append([trstates, count])
                        allstates[count] = trstates
                        toindex = count
                        count +=  1
                    else:
                        toindex = [k for k, v in allstates.items() if v  ==  trstates][0]
                    dfa.addtransition(fromindex, toindex, char)
        for value, state in allstates.items():
            if nfa.finalstates[0] in state:
                dfa.addfinalstates(value)
        self.dfa = dfa

    def acceptsString(self, string):
        currentstate = self.dfa.startstate
        for ch in string:
            if ch==":e:":
                continue
            st = list(self.dfa.gettransitions(currentstate, ch))
            if len(st) == 0:
                return False
            currentstate = st[0]
        if currentstate in self.dfa.finalstates:
            return True
        return False

    def minimise(self):
        states = list(self.dfa.states)
        n = len(states)
        unchecked = dict()
        count = 1
        distinguished = []
        equivalent = dict(zip(range(len(states)), [{s} for s in states]))
        pos = dict(zip(states,range(len(states))))
        for i in range(n-1):
            for j in range(i+1, n):
                if not ([states[i], states[j]] in distinguished or [states[j], states[i]] in distinguished):
                    eq = 1
                    toappend = []
                    for char in self.dfa.language:
                        s1 = self.dfa.gettransitions(states[i], char)
                        s2 = self.dfa.gettransitions(states[j], char)
                        if len(s1) != len(s2):
                            eq = 0
                            break
                        if len(s1) > 1:
                            raise BaseException("Multiple transitions detected in DFA")
                        elif len(s1) == 0:
                            continue
                        s1 = s1.pop()
                        s2 = s2.pop()
                        if s1 != s2:
                            if [s1, s2] in distinguished or [s2, s1] in distinguished:
                                eq = 0
                                break
                            else:
                                toappend.append([s1, s2, char])
                                eq = -1
                    if eq == 0:
                        distinguished.append([states[i], states[j]])
                    elif eq == -1:
                        s = [states[i], states[j]]
                        s.extend(toappend)
                        unchecked[count] = s
                        count += 1
                    else:
                        p1 = pos[states[i]]
                        p2 = pos[states[j]]
                        if p1 != p2:
                            st = equivalent.pop(p2)
                            for s in st:
                                pos[s] = p1
                            equivalent[p1] = equivalent[p1].union(st)
        newFound = True
        while newFound and len(unchecked) > 0:
            newFound = False
            toremove = set()
            for p, pair in list(unchecked.items()):
                for tr in pair[2:]:
                    if [tr[0], tr[1]] in distinguished or [tr[1], tr[0]] in distinguished:
                        unchecked.pop(p)
                        distinguished.append([pair[0], pair[1]])
                        newFound = True
                        break
        for pair in unchecked.values():
            p1 = pos[pair[0]]
            p2 = pos[pair[1]]
            if p1 != p2:
                st = equivalent.pop(p2)
                for s in st:
                    pos[s] = p1
                equivalent[p1] = equivalent[p1].union(st)
        if len(equivalent) == len(states):
            self.minDFA = self.dfa
        else:
            self.minDFA = self.dfa.newBuildFromEquivalentStates(equivalent, pos)

class NFAfromRegex:
    """class for building e-nfa from regular expressions"""

    def __init__(self, regex):
        self.star = '*'
        self.plus = '+'
        self.dot = '.'
        self.openingBracket = '('
        self.closingBracket = ')'
        self.operators = [self.plus, self.dot]
        self.regex = regex
        self.alphabet = []
        self.alphabet = [c.strip() for c in open('charlist')]
        self.buildNFA()

    def getNFA(self):
        return self.nfa

    def displayNFA(self):
        self.nfa.display()

    def buildNFA(self):
        language = set()
        self.stack = []
        self.automata = []
        previous = "::e::"
        for char in self.regex:
            if char in self.alphabet:
                language.add(char)
                if previous != self.dot and (previous in self.alphabet or previous in [self.closingBracket,self.star]):
                    self.addOperatorToStack(self.dot)
                self.automata.append(BuildAutomata.basicstruct(char))
            elif char  ==  self.openingBracket:
                if previous != self.dot and (previous in self.alphabet or previous in [self.closingBracket,self.star]):
                    self.addOperatorToStack(self.dot)
                self.stack.append(char)
            elif char  ==  self.closingBracket:
                if previous in self.operators:
                    raise BaseException("Error processing '%s' after '%s'" % (char, previous))
                while(1):
                    if len(self.stack) == 0:
                        raise BaseException("Error processing '%s'. Empty stack" % char)
                    o = self.stack.pop()
                    if o == self.openingBracket:
                        break
                    elif o in self.operators:
                        self.processOperator(o)
            elif char == self.star:
                if previous in self.operators or previous  == self.openingBracket or previous == self.star:
                    raise BaseException("Error processing '%s' after '%s'" % (char, previous))
                self.processOperator(char)
            elif char in self.operators:
                if previous in self.operators or previous  == self.openingBracket:
                    raise BaseException("Error processing '%s' after '%s'" % (char, previous))
                else:
                    self.addOperatorToStack(char)
            else:
                raise BaseException("Symbol '%s' is not allowed" % char)
            previous = char
        while len(self.stack) != 0:
            op = self.stack.pop()
            self.processOperator(op)
        if len(self.automata) > 1:
            print (self.automata)
            raise BaseException("Regex could not be parsed successfully")
        self.nfa = self.automata.pop()
        self.nfa.language = language

    def addOperatorToStack(self, char):
        while(1):
            if len(self.stack) == 0:
                break
            top = self.stack[len(self.stack)-1]
            if top == self.openingBracket:
                break
            if top == char or top == self.dot:
                op = self.stack.pop()
                self.processOperator(op)
            else:
                break
        self.stack.append(char)

    def processOperator(self, operator):
        if len(self.automata) == 0:
            raise BaseException("Error processing operator '%s'. Stack is empty" % operator)
        if operator == self.star:
            a = self.automata.pop()
            self.automata.append(BuildAutomata.starstruct(a))
        elif operator in self.operators:
            if len(self.automata) < 2:
                raise BaseException("Error processing operator '%s'. Inadequate operands" % operator)
            a = self.automata.pop()
            b = self.automata.pop()
            if operator == self.plus:
                self.automata.append(BuildAutomata.plusstruct(b,a))
            elif operator == self.dot:
                self.automata.append(BuildAutomata.dotstruct(b,a))

def drawGraph(automata, file = ""):
    """From https://github.com/max99x/automata-editor/blob/master/util.py"""
    f = popen(r"dot -Tpng -o graph%s.png" % file, 'w')
    try:
        f.write(automata.getDotFile())
    except:
        raise BaseException("Error creating graph")
    finally:
        f.close()

def isInstalled(program):
    """From http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python"""
    import os
    def is_exe(fpath):
        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
    fpath, fname = os.path.split(program)
    if fpath:
        if is_exe(program) or is_exe(program+".exe"):
            return True
    else:
        for path in os.environ["PATH"].split(os.pathsep):
            exe_file = os.path.join(path, program)
            if is_exe(exe_file) or is_exe(exe_file+".exe"):
                return True
    return False

ここで、日本語対応するにあたり、このツールで扱えるようにする文字群をcharlistというファイルで定義するようにした。charlistは「1文字が1行に並んだテキストファイル」とする。以下に例を示す。

charlist

a
b
c
d
e
f
g
h
i
j
・
・
(省略)
・
・
枠
鷲
亙
亘
鰐
詫
藁
蕨
椀
湾
碗
腕
々
ー
/
+
−

このcharlistAutomataTheory.pyと同じ階層に置いておく。

では早速、適当な正規表現をWFST化してみよう。以下のように引数に正規表現を受け取り、標準出力でWFSTの情報を出力するpythonコードを用意。

convert-regex2fst.py

from AutomataTheory import *
import sys

def main():
    if len(sys.argv)>1:
        inp = sys.argv[1]
    else:
        return -1;

    print ("Regular Expression: ", inp)
    nfaObj = NFAfromRegex(inp)
    nfa = nfaObj.getNFA()
    dfaObj = DFAfromNFA(nfa)
    dfa = dfaObj.getDFA()
    minDFA = dfaObj.getMinimisedDFA()

    #print "\nNFA: "
    #nfaObj.displayNFA()

    #print ("\nDFA: ")
    #dfaObj.displayDFA()

    print ("\nMinimised DFA: ")
    dfaObj.displayMinimisedDFA()

if __name__  ==  '__main__':
    t = time.time()
    try:
        main()
    except BaseException as e:
        print ("\nFailure:", e)
    print ("\nExecution time: ", time.time() - t, "seconds")

以下のコマンドを実行。

$ python convert-regex2fst.py "今日も1日頑張る(ぞい+ZOI)"
Regular Expression:  今日も1日頑張る(ぞい+ZOI)

Minimised DFA:
states: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
start state:  0
final states: [11]
transitions:
# 0 1 今 今 1

# 1 2 日 日 1

# 2 3 も も 1

# 3 4 1 1 1

# 4 5 日 日 1

# 5 6 頑 頑 1

# 6 7 張 張 1

# 7 8 る る 1

# 8 9 Z Z 1
# 8 10 ぞ ぞ 1

# 10 11 い い 1

# 9 12 O O 1

# 12 11 I I 1

# 11

Execution time:  0.009211540222167969 seconds

はい、できた。ちなみに、上述の正規表現では、「または」を表す記号に"|"ではなく"+"が使われていることに注意。

#から始まる行が、それぞれのアークを表している。最初の#で始まる行は、0番のノードから1番のノードにアークが張られており、入力シンボルは"今"、出力シンボルも"今"、重みは1ということを意味している。

これを、OpenFSTで扱える形に整形してみる。

今度は以下のようにコマンドを実行。

$ python convert-regex2fst.py "今日も1日頑張る(ぞい+ZOI)" | grep "^#" | perl -pe "s/# //g"
0 1 今 今 1
1 2 日 日 1
2 3 も も 1
3 4 1 1 1
4 5 日 日 1
5 6 頑 頑 1
6 7 張 張 1
7 8 る る 1
8 9 ぞ ぞ 1
8 10 Z Z 1
10 11 O O 1
11 12 I I 1
9 12 い い 1
12

はい、#で始まる行だけを抽出できた。これを、適当な名前で保存しておこう。

$ python convert-regex2fst.py "今日も1日頑張る(ぞい+ZOI)" | grep "^#" | perl -pe "s/# //g" > zoi

zoiという名前で保存した。

これを、OpenFSTのバイナリ形式にする。大まかな手順はこうだ。

  • OpenFSTには、入力シンボルと出力シンボルを定義するためのファイルが必要なので、まずこれを作る
  • 上述のアークの情報と、入力シンボル、出力シンボルから、FSTのバイナリを作る。
  • FSTを図示する。

なお、OpenFSTの使い方や、日本語対応などは以下の記事にまとめてあるので参考にされたし。

www.mahirokazuko.com

さて、手順通り進めよう。

まずは、入力シンボルと出力シンボルの準備だ。 これは、さきほど定義したcharlistをそのまま使いたい。だが、charlistにはイプシロンが定義されていないので、これを追加しよう。以下のようなコマンドを作った。

charlist2symbollist.sh

awk '{print $0, NR-1}' <(echo "<eps>" && cat charlist) # charlistの先頭に<eps>を追加したあと、先頭の文字からナンバリングして出力するだけ

これを使ってシンボルリストを作ろう。

./charlist2symbollist.sh  > charlist.ins   # insはinput symbolという意味で命名した
./charlist2symbollist.sh  > charlist.ous    # ousはoutput symbolという意味で命名した

次に、fstのバイナリを作ろう。

fstcompile2 charlist.ins charlist.ous zoi

これで、zoi.binができる。なお、fstcompile2は独自定義したコマンドである。上述の別記事に記載されているので見ていただきたい。

次に、このfstをgraphvizで描画してみよう。

fstdraw2 charlist.ins charlist.ous zoi.bin 500

これで、zoi.bin.pngができる。fstdraw2も例によって独自コマンドだ。

f:id:twx:20180901162533p:plain

たしかに、正規表現として正しい形になっていることが確認できる。

以上。今回は正規表現をOpenFSTで扱える形に変換してみました。

いい記事だと思っていただければ、以下の「★ + 」ボタンのクリックと、「読者になる」のボタンのクリックをお願いします!!

Kozuko Mahiro's Hacklog ―― Copyright © 2018 Mahiro Kazuko